I introduced a sleep to let rebalance make place to account for 242 before new createmessagestream. However to no avail.
Therefore it appears that without the resolution of 242, the consumer.timeout.ms parameter would not work. Thoughts/workaround? My current workaround is inside the while(true){}; but not ideal as I have to make some counter to increase and check counter at every iteration of while(it.next()). Regards Milind On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > You are probably hitting this bug in Kafka - > https://issues.apache.org/jira/browse/KAFKA-242 > > Thanks, > Neha > > 2012/3/4 <zlai_2...@sina.com>: > > I modify code as beblow. When the program createmessagestream again, it > can not get any message although there are some new messages. How could I > resolve it? Thanks! > > > > ConsumerConnector consumer = > kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig); > > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); > > topicCountMap.put("topic", new Integer(1)); > > while(true){ > > Map<String, List<KafkaMessageStream<Message>>> consumerMap = > consumer.createMessageStreams(topicCountMap); > > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0); > > ConsumerIterator<Message> it = stream.iterator(); > > try{ > > while(it.hasNext()) > > { > > ByteBuffer buffer = it.next().payload(); > > byte [] bytes = new byte[buffer.remaining()]; > > buffer.get(bytes); > > System.out.println(new String(bytes)); > > } > > } > > catch (ConsumerTimeoutException e){ > > e.printStackTrace(); > > } > > catch (Exception e){ > > e.printStackTrace(); > > } > > } > > > > > > > > >