The ZK-based producer has a bug that may not send events to all brokers. Try setting broker.list in the producer instead.
Thanks, Jun On Fri, Sep 14, 2012 at 2:57 AM, <viswanath_ponn...@dell.com> wrote: > Hello, > > I am working on experimental project on message distribution and load > balancing across cluster using Apache Kafka and Zookeeper. The goal of the > project is to equally distribute the messages to the cluster for concurrent > processing. > > For example; the server cluster contains 3 servers namely kafkaserver1, > kafkaserver2, kafkaserver3. When the producer sends the 300 number of > messages to particular topic (demo), I expect each servers should get 100 > messages each. > > The project setup > > - Started Kafka and Zookeeper process on 3 servers > > - Started 3 Consumer client connections and listening for > messages, ex: client 1 connects to Kafkaserver1, client2 connects to > Kafkaserver2, client 3 connected to Kafkaserver3. > > - Started Producer which will push messages to Zookeeper cluster. > > The code as follows: > > Sample Producer.java > > System.out.println("ProdMain - starting.."); > Properties props = new Properties(); > String broker = > "kafkaserver1:2181,kafkaserver2:2181,kafkaserver3:2181"; > > props.put("zk.connect", broker); > props.put("zk.connectiontimeout.ms", "1000000"); > props.put("zk.sessiontimeout.ms", "1000000"); > props.put("partitioner.class", > "com.esg.ganges.kafka.MemberIdPartitioner"); > props.put("serializer.class", StringEncoder.class.getName()); > > System.out.println("ProdMain - Initializing.."); > ProducerConfig config = new ProducerConfig(props); > System.out.println("ProdMain - con time: " + > config.getZkConnectionTimeoutMs()); > > System.out.println("ProdMain - Producer:start"); > Producer<String, String> producer = new Producer<String, > String>(config); > > System.out.println("ProdMain - Creating the data"); > StringProducerData prodData = new StringProducerData("demo"); > System.out.println("ProdMain - Start sending messages..."); > try { > long start = System.currentTimeMillis(); > prodData.add("Hello world"); > for (int i = 0; i < Integer.MAX_VALUE; i++) { > producer.send(prodData); > } > long cost = System.currentTimeMillis() - start; > System.out.println("send message cost: "+cost+" ms"); > } finally { > producer.close(); > } > > System.out.println("ProdMain - End"); > > > Consumer.java > Properties props = new Properties(); > String broker = args[0]+":2181"; > System.out.println("Connecting to the Server:" + broker); > > props.put("zk.connect", broker); > props.put("groupid", "test_group"); > // > ConsumerConfig consumerConfig = new ConsumerConfig(props); > ConsumerConnector connector = Consumer.create(consumerConfig); > // > Map<String, List<MessageStream<String>>> topicMessageStreams = > connector.createMessageStreams(ImmutableMap.of("demo", 2), new > StringDecoder()); > List<MessageStream<String>> streams = > topicMessageStreams.get("demo"); > // > ExecutorService executor = Executors.newFixedThreadPool(2); > final AtomicInteger count = new AtomicInteger(); > for (final MessageStream<String> stream : streams) { > executor.submit(new Runnable() { > > public void run() { > for (String message : stream) { > System.out.println(count.incrementAndGet() + " => > " + message); > } > } > }); > } > // > System.out.println("Connected to the broker:" + broker + ", > waiting for messages.."); > executor.awaitTermination(1, TimeUnit.HOURS); > } > > public class MemberIdPartitioner implements Partitioner{ > public MemberIdPartitioner() { > System.out.println("Initialized: MemberIdPartitioner.."); > } > > @Override > public int partition(Object arg0, int numberOfPartitions) { > System.out.println("Type: " + arg0.getClass().getName()); > // TODO Auto-generated method stub > return 3; > } > > } > > > The behavior I see in this implementation is all the messages been > consumed by only single consumer. When one of the consumer is shutdown; the > next consumer gets activated and see messages consumed. My experiment is to > distribute messages equally across all the three servers. Please do let me > know if I am doing something wrong. > > Thank you, > Vish >