I wrote this small test case: http://pastebin.com/Lv1wgK36
First time there is a iterator timeout, my code catches it (line 70). The second iteration of the while loop falls into the IllegalStateException after requesting a new iterator from the same stream. 2012/1/5 Patricio Echagüe <patric...@gmail.com> > Jun and Neha, Thanks again for the quick response. > > I use Kafka 0.7 > > What I'm trying to achieve is to not block on the iterator. So I set up a > consumer.timeout.ms=300. The pseudo code is something like this: > > private void myMethod() { > try { > > for(Message message: stream) { > > System.out.println("Message received: " + message.toString()); > > } > > } catch (ConsumerTimeoutException e) { > > // do something > > } > > } > > > when a timeout occurs, next time this method is called it does > stream.iterator() (see the "for" loop in my snippet). So answering Neha's > question, I do create the iterator again. What I don't re-create is the > MessageStream. Do I need to re-create the stream as well ? > > > Just requesting a new iterator after a timeout doesn't seem to help. > > On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <jun...@gmail.com> wrote: > >> Actually, the consumer stream is supposed to be re-iterable after the >> timeout. But make sure that you check hasNext before calling next when you >> resume the consumption. >> >> Also, could you try this on the 0.7 release? We fixed a bunch of issues >> between 0.6 and 0.7. >> >> Thanks, >> >> Jun >> >> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede <neha.narkh...@gmail.com >> >wrote: >> >> > Hi Patricio, >> > >> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask >> for >> > the >> > > iterator from the same strem, I get a: >> > >> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300. If >> > you do this, it just means that the consumer iterators will shut down >> > if they don't get another set of messages from Kafka within 300 ms. >> > Since the iterators shut down, it is illegal to call hasNext()/next() >> > on the iterators, without recreating them. The way to recreate the >> > iterators is via the createMessageStreams() API in >> > ZookeeperConsumerConnector. >> > >> > Thanks, >> > Neha >> > >> > 2012/1/5 Patricio Echagüe <patric...@gmail.com>: >> > > Hi again. I think I'm running into the Iterator issue mentioned here: >> > > >> > > >> > >> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E >> > > >> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask >> for >> > the >> > > iterator from the same strem, I get a: >> > > >> > > java.lang.IllegalStateException: Iterator is in failed state >> > > >> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47) >> > > >> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext( >> > > KafkaConsumerIterator.java:21) >> > > >> > > ..... >> > > >> > > ..... >> > > >> > > Note: I'm using the latest Kafka release. >> > > >> > > Any suggestion ? >> > > >> > > Thanks >> > >> > >