Jun, I agree that the iterator should be re-iterable after a timeout, but I was referring to its current behavior, which does not allow re-iteration after a timeout (look at maybeComputeNext()) Today, the only way to re-iterate after a timeout is to re-create the KafkaMessageStream, which essentially means we don't allow re-iteration from a timed-out iterator.
Patricio, >> So answering Neha's question, I do create the iterator again. The bug is that under the covers, we don't create a new iterator, we reuse the iterator which entered a failed state after the timeout. Hence, you see the IllegalStateException. Thanks for writing up that test. Do you mind filing a bug and attaching your test there ? -Neha 2012/1/5 Patricio Echagüe <patric...@gmail.com>: > I wrote this small test case: http://pastebin.com/Lv1wgK36 > > First time there is a iterator timeout, my code catches it (line 70). The > second iteration of the while loop falls into the IllegalStateException > after requesting a new iterator from the same stream. > > 2012/1/5 Patricio Echagüe <patric...@gmail.com> > >> Jun and Neha, Thanks again for the quick response. >> >> I use Kafka 0.7 >> >> What I'm trying to achieve is to not block on the iterator. So I set up a >> consumer.timeout.ms=300. The pseudo code is something like this: >> >> private void myMethod() { >> try { >> >> for(Message message: stream) { >> >> System.out.println("Message received: " + message.toString()); >> >> } >> >> } catch (ConsumerTimeoutException e) { >> >> // do something >> >> } >> >> } >> >> >> when a timeout occurs, next time this method is called it does >> stream.iterator() (see the "for" loop in my snippet). So answering Neha's >> question, I do create the iterator again. What I don't re-create is the >> MessageStream. Do I need to re-create the stream as well ? >> >> >> Just requesting a new iterator after a timeout doesn't seem to help. >> >> On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <jun...@gmail.com> wrote: >> >>> Actually, the consumer stream is supposed to be re-iterable after the >>> timeout. But make sure that you check hasNext before calling next when you >>> resume the consumption. >>> >>> Also, could you try this on the 0.7 release? We fixed a bunch of issues >>> between 0.6 and 0.7. >>> >>> Thanks, >>> >>> Jun >>> >>> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede <neha.narkh...@gmail.com >>> >wrote: >>> >>> > Hi Patricio, >>> > >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask >>> for >>> > the >>> > > iterator from the same strem, I get a: >>> > >>> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300. If >>> > you do this, it just means that the consumer iterators will shut down >>> > if they don't get another set of messages from Kafka within 300 ms. >>> > Since the iterators shut down, it is illegal to call hasNext()/next() >>> > on the iterators, without recreating them. The way to recreate the >>> > iterators is via the createMessageStreams() API in >>> > ZookeeperConsumerConnector. >>> > >>> > Thanks, >>> > Neha >>> > >>> > 2012/1/5 Patricio Echagüe <patric...@gmail.com>: >>> > > Hi again. I think I'm running into the Iterator issue mentioned here: >>> > > >>> > > >>> > >>> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E >>> > > >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask >>> for >>> > the >>> > > iterator from the same strem, I get a: >>> > > >>> > > java.lang.IllegalStateException: Iterator is in failed state >>> > > >>> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47) >>> > > >>> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext( >>> > > KafkaConsumerIterator.java:21) >>> > > >>> > > ..... >>> > > >>> > > ..... >>> > > >>> > > Note: I'm using the latest Kafka release. >>> > > >>> > > Any suggestion ? >>> > > >>> > > Thanks >>> > >>> >> >>