I have a requirement to prove kafka producer can produce 1 million
events/second to Kafka cluster.

So far, best I could achieve is 200k events/sec on topic with 2 partitions.
The latency increases with adding more partitions so I want to test with 2
partitions for now.

Below are the details along with produce code (java). How can I achieve
produce 1million event/sec.? I went thru kafka benchmarking blog as well.
https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

*Kafka cluster:* 3 brokers on 3 servers. Each sever is 16 TB (16 JBODs),
64GB RAM.
*Broker:* Allocated 6GB, 16 io.threads, 8 network threads.
*Topic: 2* partition, replication factor of 1 (Get high latency)
*Zookeepers: *3 zk instances running individually on master nodes (not
co-located with kafka broker/servers)


*Producer Code:*
public class TestProducer {

    private static String msg = "TEST KAFKA PERFORMANCE";
    private static Logger LOG = Logger.getLogger(TestProducer.class);

    public static void main(String... args){
        System.out.println("START - Test Producer");

        long messageCount = Long.parseLong(args[0]);
        long messageCountForStat = Long.parseLong(args[0]);
        String topic = args[1];
        String brokerList = args[2];
        int batchCount = Integer.parseInt(args[3]);
        int topicPartions = Integer.parseInt(args[4]);
        Producer<String, String> producer = getProducer(brokerList,
batchCount);
        Date startTime = new Date(System.currentTimeMillis());
        Random rnd = new Random();
        String partition = "";
        //Produce messages.
        while (messageCount != 0) {
            partition = ""+(int)messageCount%topicPartions;
            KeyedMessage<String, String> message =
                    new KeyedMessage<String, String>(topic, partition, msg);
            producer.send(message);
            messageCount--;
        }

        Date endTime = new Date(System.currentTimeMillis());
        System.out.println("#########################################");
        System.out.println("MESSAGES SENT: " + messageCountForStat);
        System.out.println("START TIME: " + startTime);
        System.out.println("END TIME: " + endTime);
        System.out.println("#########################################");
        System.out.println("END - Test Producer");
    }

    public static Producer<String, String> getProducer(String brokerList,
int batchSize) {

        props.put("metadata.broker.list", brokerList);
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "com.my.SimplePartitioner");
        props.put("request.required.acks", "0");
        props.put("producer.type", "async");
        props.put("compression.codec", "snappy");
        props.put("batch.num.messages", Integer.toString(batchSize));

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String,
String>(config);
        return producer;
    }

}

Reply via email to