HI Neha,

If I solved the problem number 1 think and number 2 will be solved  (prob 1
is causing problem number 2(blocked)).  Can you please let me know what
controls the queue size for *ConsumerFetcherThread* thread ?


Please see the attached java source code which will reproduce the problem.
You may remove the recovery process...  Please check.  We have to do some
work before we start reading from Kafka Stream Interator and this seems to
cause some issue with java.lang.
IllegalStateException: Iterator is in failed state*.

Please let me know your finding and recommendation.

Thanks,

Bhavesh

On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <neha.narkh...@gmail.com>
wrote:

> >> Sometime it give following exception.
>
> It will help to have a more specific test case that reproduces the failed
> iterator state.
>
> Also, the consumer threads block if the fetcher queue is full. The queue
> can fill up if your consumer thread dies or slows down. I'd recommend you
> ensure that all your consumer threads are alive. You can take a thread dump
> to verify this.
>
> Thanks,
> Neha
>
> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> mistry.p.bhav...@gmail.com>
> wrote:
>
> > Hi Neha,
> >
> >
> > I have two problems:.  Any help is greatly appreciated.
> >
> >
> > 1)* java.lang.IllegalStateException: Iterator is in failed state*
> >
> >        ConsumerConnector  consumerConnector = Consumer
> >                 .createJavaConsumerConnector(getConsumerConfig());
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >         topicCountMap.put(topic, *32*);
> >         Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamMap =
> > consumerConnector
> >                 .createMessageStreams(topicCountMap);
> >
> >         List<KafkaStream<byte[], byte[]>> streams =
> > Collections.synchronizedList(topicStreamMap.get(topic));
> >
> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> >
> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
> > streams.iterator();
> >         // remove the head first list for this source...rest are for the
> > Dynamic Souce...
> >         mainIterator = iterator.next().iterator();
> >
> >         List<ConsumerIterator<byte[], byte[]>> iteratorList = new
> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> >         // now rest of the iterator must be registered now..
> >         while(iterator.hasNext()){
> >             iteratorList.add(iterator.next().iterator());
> >         }
> >         *KafkaStreamRegistory.registerStream(mainSourceName,
> > iteratorList);*
> >
> > Once the Consumer iterator is created and registered.  We use this in
> > another thread to start reading from the Consumer Iterator.   Sometime it
> > give following exception.
> >
> > 24 Oct 2014 16:03:25,923 ERROR
> > [SourceReader:request_source:LogStreamKafkaSource1]
> > (grizzled.slf4j.Logger.error:116)  - SourceThread: exception during
> reads.
> > Swallowed to continue next read.
> > java.lang.IllegalStateException: Iterator is in failed state
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> >
> >
> > I have tried to recover from this state by using this:
> > iterator.resetState(); but it does not recover sometime.
> >
> >
> >
> >
> > *2) ConsumerFetcherThread are blocked on enqueue ?  What controls size of
> > queue ? Why are they blocked ?  *Due to this our lags are increasing.
> our
> > threads blocked on hasNext()...
> >
> >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on condition
> > [0x0000000116379000]
> >    java.lang.Thread.State: WAITING (parking)
> >         at sun.misc.Unsafe.park(Native Method)
> >         - parking to wait for  <0x0000000704019388> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >         at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >         at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >         at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >         at
> >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >
> >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on condition
> > [0x0000000116276000]
> >    java.lang.Thread.State: WAITING (parking)
> >         at sun.misc.Unsafe.park(Native Method)
> >         - parking to wait for  <0x0000000704064ce0> (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >         at
> > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> >         at
> >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> >         at
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> >         at
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> >         at
> >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> >         at
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> >         at
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> >         at
> >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> >         at
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> >         at
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> >
> >
> >
> >
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> >
> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <neha.narkh...@gmail.com>
> > wrote:
> >
> > > Can you provide the steps to reproduce this issue?
> > >
> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> > > mistry.p.bhav...@gmail.com>
> > > wrote:
> > >
> > > > I am using one from the Kafka Trunk branch.
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> > neha.narkh...@gmail.com>
> > > > wrote:
> > > >
> > > > > Which version of Kafka are you using on the consumer?
> > > > >
> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > > > mistry.p.bhav...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > HI Kafka Community ,
> > > > > >
> > > > > > I am using kafka trunk source code and I get following exception.
> > > What
> > > > > > could cause the iterator to have FAILED state.  Please let me
> know
> > > how
> > > > I
> > > > > > can fix this issue.
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > *java.lang.IllegalStateException: Iterator is in failed state
> at
> > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > Here is Properties:
> > > > > >
> > > > > >         Properties props = new Properties();
> > > > > >         props.put("zookeeper.connect", zkConnect);
> > > > > >         props.put("group.id", groupId);
> > > > > > *        props.put("consumer.timeout.ms <
> > http://consumer.timeout.ms
> > > >",
> > > > > > "-1");*
> > > > > >         props.put("zookeeper.session.timeout.ms", "10000");
> > > > > >         props.put("zookeeper.sync.time.ms", "6000");
> > > > > >         props.put("auto.commit.interval.ms", "2000");
> > > > > >         props.put("rebalance.max.retries", "8");
> > > > > >         props.put("auto.offset.reset", "largest");
> > > > > >         props.put("fetch.message.max.bytes","2097152");
> > > > > >         props.put("socket.receive.buffer.bytes","2097152");
> > > > > >         props.put("auto.commit.enable","true");
> > > > > >
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to