hi, I'm using tomcat-6.0.35 and java 1.7.0_04.
thanks. On Fri, Jun 8, 2012 at 1:51 PM, Danuka Aluthge <danu...@hsenidmobile.com>wrote: > hi, > > I'm using kafka-0.6 library with spring for a web application which is > intended to read and write from a queue. The java class is as below; > > > > ============================================================================= > package xxx.xxxx.handlers; > > import xxx.xxxx.kafka.builders.Builder; > import kafka.consumer.ConsumerIterator; > import kafka.consumer.KafkaMessageStream; > import kafka.javaapi.consumer.ConsumerConnector; > import kafka.javaapi.producer.Producer; > import kafka.javaapi.producer.ProducerData; > import kafka.message.Message; > import org.slf4j.Logger; > > import java.io.*; > import java.nio.ByteBuffer; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > /** > * > */ > public class DefaultQueueHandler implements QueueHandler { > > private Producer<String, Serializable> producer; > private ConsumerConnector consumer; > private ConsumerIterator it; > private Properties consumerProperties; > private Properties producerProperties; > private String messageKey; > private String topic = "topic"; > private static final Logger LOGGER = > org.slf4j.LoggerFactory.getLogger(DefaultQueueHandler.class); > > public void init() { > LOGGER.debug("INIT" + this); > producer = Builder.buildProducer(producerProperties); > consumer = Builder.buildConsumerConnector(consumerProperties); > Map<String, Integer> topicCountMap = new HashMap<String, > Integer>(); > topicCountMap.put(topic, new Integer(1)); > Map<String, List<KafkaMessageStream>> consumerMap = > consumer.createMessageStreams(topicCountMap); > KafkaMessageStream stream = consumerMap.get(topic).get(0); > it = stream.iterator(); > } > > public void enqueue(Map<String, Serializable> message) { > producer.send(new ProducerData<String, Serializable>(topic, > message.get(messageKey))); > } > > public Map<String, Serializable> dequeue() throws > ClassNotFoundException, IOException { > if (it.hasNext()) { > return getMapFromMessage(it.next()); > } > return null; > } > > private Map<String, Serializable> getMapFromMessage(Message message) > throws IOException, ClassNotFoundException { > ByteBuffer buffer = message.payload(); > byte[] dataBytes = new byte[buffer.remaining()]; > buffer.get(dataBytes); > ByteArrayInputStream bais = new ByteArrayInputStream(dataBytes); > ObjectInput oi = new ObjectInputStream(bais); > return (Map<String, Serializable>) oi.readObject(); > } > > public void setConsumerProperties(Properties consumerProperties) { > this.consumerProperties = consumerProperties; > } > > public void setProducerProperties(Properties producerProperties) { > this.producerProperties = producerProperties; > } > > public void setMessageKey(String messageKey) { > this.messageKey = messageKey; > } > > public void setTopic(String topic) { > this.topic = topic; > } > } > > > ============================================================================= > > The init() method is called on system start up (soon after the war file is > deployed on tomcat). The flow suspends at "it.hasNext()" of the dequeue() > method and no errors are thrown. However the code runs fine in a > stand-alone application (without tomcat). > > What should be the issue? Please help. > > > thanks, > Shyarmal. >