I introduced a sleep to let rebalance make place to account for 242 before
new createmessagestream. However to no avail.

Therefore it appears that without the resolution of 242, the
consumer.timeout.ms parameter would not work.

Thoughts/workaround?

My current workaround is inside the while(true){}; but not ideal as I have
to make some counter to increase and check counter at every iteration of
while(it.next()).

Regards
Milind


On Mon, Mar 5, 2012 at 9:29 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> You are probably hitting this bug in Kafka -
> https://issues.apache.org/jira/browse/KAFKA-242
>
> Thanks,
> Neha
>
> 2012/3/4  <zlai_2...@sina.com>:
> > I modify code as beblow. When the program createmessagestream again, it
> can not get any message although there are some new messages. How could I
> resolve it? Thanks!
> >
> > ConsumerConnector consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
> > Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> > topicCountMap.put("topic", new Integer(1));
> > while(true){
> > Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> > KafkaMessageStream<Message> stream = consumerMap.get("topic").get(0);
> > ConsumerIterator<Message> it = stream.iterator();
> > try{
> > while(it.hasNext())
> > {
> > ByteBuffer buffer = it.next().payload();
> > byte [] bytes = new byte[buffer.remaining()];
> > buffer.get(bytes);
> > System.out.println(new String(bytes));
> > }
> > }
> > catch (ConsumerTimeoutException e){
> >     e.printStackTrace();
> >  }
> >  catch (Exception e){
> >   e.printStackTrace();
> >  }
> > }
> >
> >
> >
> >
>

Reply via email to