I am working on experimental project on message distribution and load balancing 
across cluster using Apache Kafka and Zookeeper.  The goal of the project is to 
equally distribute the messages to the cluster for concurrent processing.

For example; the server cluster contains 3 servers namely kafkaserver1, 
kafkaserver2, kafkaserver3.  When the producer sends the 300 number of messages 
to particular topic (demo), I expect each servers should get 100 messages each.

The project setup

-          Started Kafka and Zookeeper process on 3 servers

-          Started 3 Consumer client connections and listening for messages, 
ex: client 1 connects to Kafkaserver1, client2 connects to Kafkaserver2, client 
3 connected to Kafkaserver3.

-          Started Producer which will push messages to Zookeeper cluster.

The code as follows:

Sample Producer.java

       System.out.println("ProdMain - starting..");
       Properties props = new Properties();
        String broker = "kafkaserver1:2181,kafkaserver2:2181,kafkaserver3:2181";

        props.put("zk.connect", broker);
        props.put("zk.connectiontimeout.ms", "1000000");
        props.put("zk.sessiontimeout.ms", "1000000");
        props.put("serializer.class", StringEncoder.class.getName());

        System.out.println("ProdMain - Initializing..");
        ProducerConfig config = new ProducerConfig(props);
        System.out.println("ProdMain - con time: " + 

        System.out.println("ProdMain - Producer:start");
        Producer<String, String> producer = new Producer<String, 

        System.out.println("ProdMain - Creating the data");
        StringProducerData prodData = new StringProducerData("demo");
        System.out.println("ProdMain - Start sending messages...");
        try {
            long start = System.currentTimeMillis();
            prodData.add("Hello world");
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
            long cost = System.currentTimeMillis() - start;
            System.out.println("send message cost: "+cost+" ms");
        } finally {

        System.out.println("ProdMain - End");

        Properties props = new Properties();
        String broker = args[0]+":2181";
        System.out.println("Connecting to the Server:" + broker);

        props.put("zk.connect", broker);
        props.put("groupid", "test_group");
        ConsumerConfig consumerConfig = new ConsumerConfig(props);
        ConsumerConnector connector = Consumer.create(consumerConfig);
        Map<String, List<MessageStream<String>>> topicMessageStreams = 
connector.createMessageStreams(ImmutableMap.of("demo", 2), new StringDecoder());
        List<MessageStream<String>> streams = topicMessageStreams.get("demo");
        ExecutorService executor = Executors.newFixedThreadPool(2);
        final AtomicInteger count = new AtomicInteger();
        for (final MessageStream<String> stream : streams) {
            executor.submit(new Runnable() {

                public void run() {
                    for (String message : stream) {
                        System.out.println(count.incrementAndGet() + " => " + 
        System.out.println("Connected to the broker:" + broker + ", waiting for 
        executor.awaitTermination(1, TimeUnit.HOURS);

public class MemberIdPartitioner implements Partitioner{
      public MemberIdPartitioner() {
            System.out.println("Initialized: MemberIdPartitioner..");

      public int partition(Object arg0, int numberOfPartitions) {
            System.out.println("Type: " + arg0.getClass().getName());
            // TODO Auto-generated method stub
            return 3;


The behavior I see in this implementation is all the messages been consumed by 
only single consumer. When one of the consumer is shutdown; the next consumer 
gets activated and see messages consumed. My experiment is to distribute 
messages equally across all the three servers. Please do let me know if I am 
doing something wrong.

Thank you,

Reply via email to