>> Therefore it appears that without the resolution of 242, the consumer.timeout.ms parameter would not work.
Please can you elaborate on this ? Do you have some test code we can look at ? Thanks, Neha On Wed, Mar 7, 2012 at 5:19 PM, Milind Parikh <milindpar...@gmail.com> wrote: > I introduced a sleep to let rebalance make place to account for 242 before > new createmessagestream. However to no avail. > > Therefore it appears that without the resolution of 242, the > consumer.timeout.ms parameter would not work. > > Thoughts/workaround? > > My current workaround is inside the while(true){}; but not ideal as I have > to make some counter to increase and check counter at every iteration of > while(it.next()). > > Regards > Milind > > > On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> You are probably hitting this bug in Kafka - >> https://issues.apache.org/jira/browse/KAFKA-242 >> >> Thanks, >> Neha >> >> 2012/3/4 <zlai_2...@sina.com>: >> > I modify code as beblow. When the program createmessagestream again, it >> can not get any message although there are some new messages. How could I >> resolve it? Thanks! >> > >> > ConsumerConnector consumer = >> kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig); >> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); >> > topicCountMap.put("topic", new Integer(1)); >> > while(true){ >> > Map<String, List<KafkaMessageStream<Message>>> consumerMap = >> consumer.createMessageStreams(topicCountMap); >> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0); >> > ConsumerIterator<Message> it = stream.iterator(); >> > try{ >> > while(it.hasNext()) >> > { >> > ByteBuffer buffer = it.next().payload(); >> > byte [] bytes = new byte[buffer.remaining()]; >> > buffer.get(bytes); >> > System.out.println(new String(bytes)); >> > } >> > } >> > catch (ConsumerTimeoutException e){ >> > e.printStackTrace(); >> > } >> > catch (Exception e){ >> > e.printStackTrace(); >> > } >> > } >> > >> > >> > >> > >>