I suggest you look up what a 'partition' is in kafka terms, they have
a quite good description at
http://incubator.apache.org/kafka/design.html

On Fri, Sep 14, 2012 at 2:57 AM,  <viswanath_ponn...@dell.com> wrote:
> Hello,
>
> I am working on experimental project on message distribution and load 
> balancing across cluster using Apache Kafka and Zookeeper.  The goal of the 
> project is to equally distribute the messages to the cluster for concurrent 
> processing.
>
> For example; the server cluster contains 3 servers namely kafkaserver1, 
> kafkaserver2, kafkaserver3.  When the producer sends the 300 number of 
> messages to particular topic (demo), I expect each servers should get 100 
> messages each.
>
> The project setup
>
> -          Started Kafka and Zookeeper process on 3 servers
>
> -          Started 3 Consumer client connections and listening for messages, 
> ex: client 1 connects to Kafkaserver1, client2 connects to Kafkaserver2, 
> client 3 connected to Kafkaserver3.
>
> -          Started Producer which will push messages to Zookeeper cluster.
>
> The code as follows:
>
> Sample Producer.java
>
>        System.out.println("ProdMain - starting..");
>        Properties props = new Properties();
>         String broker = 
> "kafkaserver1:2181,kafkaserver2:2181,kafkaserver3:2181";
>
>         props.put("zk.connect", broker);
>         props.put("zk.connectiontimeout.ms", "1000000");
>         props.put("zk.sessiontimeout.ms", "1000000");
>         props.put("partitioner.class", 
> "com.esg.ganges.kafka.MemberIdPartitioner");
>         props.put("serializer.class", StringEncoder.class.getName());
>
>         System.out.println("ProdMain - Initializing..");
>         ProducerConfig config = new ProducerConfig(props);
>         System.out.println("ProdMain - con time: " + 
> config.getZkConnectionTimeoutMs());
>
>         System.out.println("ProdMain - Producer:start");
>         Producer<String, String> producer = new Producer<String, 
> String>(config);
>
>         System.out.println("ProdMain - Creating the data");
>         StringProducerData prodData = new StringProducerData("demo");
>         System.out.println("ProdMain - Start sending messages...");
>         try {
>             long start = System.currentTimeMillis();
>             prodData.add("Hello world");
>             for (int i = 0; i < Integer.MAX_VALUE; i++) {
>                 producer.send(prodData);
>             }
>             long cost = System.currentTimeMillis() - start;
>             System.out.println("send message cost: "+cost+" ms");
>         } finally {
>             producer.close();
>         }
>
>         System.out.println("ProdMain - End");
>
>
> Consumer.java
>         Properties props = new Properties();
>         String broker = args[0]+":2181";
>         System.out.println("Connecting to the Server:" + broker);
>
>         props.put("zk.connect", broker);
>         props.put("groupid", "test_group");
>         //
>         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>         ConsumerConnector connector = Consumer.create(consumerConfig);
>         //
>         Map<String, List<MessageStream<String>>> topicMessageStreams = 
> connector.createMessageStreams(ImmutableMap.of("demo", 2), new 
> StringDecoder());
>         List<MessageStream<String>> streams = topicMessageStreams.get("demo");
>         //
>         ExecutorService executor = Executors.newFixedThreadPool(2);
>         final AtomicInteger count = new AtomicInteger();
>         for (final MessageStream<String> stream : streams) {
>             executor.submit(new Runnable() {
>
>                 public void run() {
>                     for (String message : stream) {
>                         System.out.println(count.incrementAndGet() + " => " + 
> message);
>                     }
>                 }
>             });
>         }
>         //
>         System.out.println("Connected to the broker:" + broker + ", waiting 
> for messages..");
>         executor.awaitTermination(1, TimeUnit.HOURS);
>     }
>
> public class MemberIdPartitioner implements Partitioner{
>       public MemberIdPartitioner() {
>             System.out.println("Initialized: MemberIdPartitioner..");
>       }
>
>       @Override
>       public int partition(Object arg0, int numberOfPartitions) {
>             System.out.println("Type: " + arg0.getClass().getName());
>             // TODO Auto-generated method stub
>             return 3;
>       }
>
> }
>
>
> The behavior I see in this implementation is all the messages been consumed 
> by only single consumer. When one of the consumer is shutdown; the next 
> consumer gets activated and see messages consumed. My experiment is to 
> distribute messages equally across all the three servers. Please do let me 
> know if I am doing something wrong.
>
> Thank you,
> Vish

Reply via email to