No problem. I'll do that. Thank you guys.
On Thu, Jan 5, 2012 at 4:31 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > Jun, > > I agree that the iterator should be re-iterable after a timeout, but I > was referring to its current behavior, which does not allow > re-iteration after a timeout (look at maybeComputeNext()) > Today, the only way to re-iterate after a timeout is to re-create the > KafkaMessageStream, which essentially means we don't allow > re-iteration from a timed-out iterator. > > Patricio, > > >> So answering Neha's question, I do create the iterator again. > > The bug is that under the covers, we don't create a new iterator, we > reuse the iterator which entered a failed state after the timeout. > Hence, you see the IllegalStateException. > Thanks for writing up that test. Do you mind filing a bug and > attaching your test there ? > > -Neha > > 2012/1/5 Patricio Echagüe <patric...@gmail.com>: > > I wrote this small test case: http://pastebin.com/Lv1wgK36 > > > > First time there is a iterator timeout, my code catches it (line 70). The > > second iteration of the while loop falls into the IllegalStateException > > after requesting a new iterator from the same stream. > > > > 2012/1/5 Patricio Echagüe <patric...@gmail.com> > > > >> Jun and Neha, Thanks again for the quick response. > >> > >> I use Kafka 0.7 > >> > >> What I'm trying to achieve is to not block on the iterator. So I set up > a > >> consumer.timeout.ms=300. The pseudo code is something like this: > >> > >> private void myMethod() { > >> try { > >> > >> for(Message message: stream) { > >> > >> System.out.println("Message received: " + > message.toString()); > >> > >> } > >> > >> } catch (ConsumerTimeoutException e) { > >> > >> // do something > >> > >> } > >> > >> } > >> > >> > >> when a timeout occurs, next time this method is called it does > >> stream.iterator() (see the "for" loop in my snippet). So answering > Neha's > >> question, I do create the iterator again. What I don't re-create is the > >> MessageStream. Do I need to re-create the stream as well ? > >> > >> > >> Just requesting a new iterator after a timeout doesn't seem to help. > >> > >> On Thu, Jan 5, 2012 at 1:16 PM, Jun Rao <jun...@gmail.com> wrote: > >> > >>> Actually, the consumer stream is supposed to be re-iterable after the > >>> timeout. But make sure that you check hasNext before calling next when > you > >>> resume the consumption. > >>> > >>> Also, could you try this on the 0.7 release? We fixed a bunch of issues > >>> between 0.6 and 0.7. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> On Thu, Jan 5, 2012 at 12:42 PM, Neha Narkhede < > neha.narkh...@gmail.com > >>> >wrote: > >>> > >>> > Hi Patricio, > >>> > > >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask > >>> for > >>> > the > >>> > > iterator from the same strem, I get a: > >>> > > >>> > By timeout, I'm guessing you mean setting consumer.timeout.ms=300. > If > >>> > you do this, it just means that the consumer iterators will shut down > >>> > if they don't get another set of messages from Kafka within 300 ms. > >>> > Since the iterators shut down, it is illegal to call hasNext()/next() > >>> > on the iterators, without recreating them. The way to recreate the > >>> > iterators is via the createMessageStreams() API in > >>> > ZookeeperConsumerConnector. > >>> > > >>> > Thanks, > >>> > Neha > >>> > > >>> > 2012/1/5 Patricio Echagüe <patric...@gmail.com>: > >>> > > Hi again. I think I'm running into the Iterator issue mentioned > here: > >>> > > > >>> > > > >>> > > >>> > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201201.mbox/%3CCALMKdpuEJfjdo8eHoA-7jGfgp6HhzYiJZRrxLcQCfTK71o%3DgkQ%40mail.gmail.com%3E > >>> > > > >>> > > I set up a 300 ms timeout. Once the timeout occurs, next time I ask > >>> for > >>> > the > >>> > > iterator from the same strem, I get a: > >>> > > > >>> > > java.lang.IllegalStateException: Iterator is in failed state > >>> > > > >>> > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:47) > >>> > > > >>> > > at com.lucid.dao.queue.impl.kafka.KafkaConsumerIterator.hasNext( > >>> > > KafkaConsumerIterator.java:21) > >>> > > > >>> > > ..... > >>> > > > >>> > > ..... > >>> > > > >>> > > Note: I'm using the latest Kafka release. > >>> > > > >>> > > Any suggestion ? > >>> > > > >>> > > Thanks > >>> > > >>> > >> > >> >