hi, I'm using kafka-0.6 library with spring for a web application which is intended to read and write from a queue. The java class is as below;
============================================================================= package xxx.xxxx.handlers; import xxx.xxxx.kafka.builders.Builder; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaMessageStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.javaapi.producer.Producer; import kafka.javaapi.producer.ProducerData; import kafka.message.Message; import org.slf4j.Logger; import java.io.*; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * */ public class DefaultQueueHandler implements QueueHandler { private Producer<String, Serializable> producer; private ConsumerConnector consumer; private ConsumerIterator it; private Properties consumerProperties; private Properties producerProperties; private String messageKey; private String topic = "topic"; private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class); public void init() { LOGGER.debug("INIT" + this); producer = Builder.buildProducer(producerProperties); consumer = Builder.buildConsumerConnector(consumerProperties); Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, new Integer(1)); Map<String, List<KafkaMessageStream>> consumerMap = consumer.createMessageStreams(topicCountMap); KafkaMessageStream stream = consumerMap.get(topic).get(0); it = stream.iterator(); } public void enqueue(Map<String, Serializable> message) { producer.send(new ProducerData<String, Serializable>(topic, message.get(messageKey))); } public Map<String, Serializable> dequeue() throws ClassNotFoundException, IOException { if (it.hasNext()) { return getMapFromMessage(it.next()); } return null; } private Map<String, Serializable> getMapFromMessage(Message message) throws IOException, ClassNotFoundException { ByteBuffer buffer = message.payload(); byte[] dataBytes = new byte[buffer.remaining()]; buffer.get(dataBytes); ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes); ObjectInput oi = new ObjectInputStream(bais); return (Map<String, Serializable>) oi.readObject(); } public void setConsumerProperties(Properties consumerProperties) { this.consumerProperties = consumerProperties; } public void setProducerProperties(Properties producerProperties) { this.producerProperties = producerProperties; } public void setMessageKey(String messageKey) { this.messageKey = messageKey; } public void setTopic(String topic) { this.topic = topic; } } ============================================================================= The init() method is called on system start up (soon after the war file is deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue() method and no errors are thrown. However the code runs fine in a stand-alone application (without tomcat). What should be the issue? Please help. thanks, Shyarmal.