Yes its the same code but i have integrated consumer with Esper CEP
listener.

Similarly on producer side i tried passing customized object(with java
serialization) in Message but since it didnt work properly, i am now
passing simple Strings only to broker.

I agree to the IOException explanation since i am trying different
scenarios.

But the OffsetOutOfRangeException happened at runtime, although it didnt
stop anything. Should i observe it again, i will preserve the logs and post
it back.



On Wed, Apr 25, 2012 at 12:20 AM, Jun Rao <jun...@gmail.com> wrote:

> The OffsetOutOfRangeException means that the consumer is trying to fetch
> from an offset outside of the range of the log in the broker. This can
> happen if the consumer has stopped consuming for a long time and part of
> the log in the broker has been deleted. The other exceptions seem to be
> related to certain clients getting killed.
>
> What kind of customization did you do? Are you using the java/scala
> producer/consumer provided in Kafka?
>
> Thanks,
>
> Jun
>
>
> On Mon, Apr 23, 2012 at 11:47 PM, navneet sharma <
> navneetsharma0...@gmail.com> wrote:
>
> > Hi,
> >
> > I noticed few exceptions while trying my customized producer-consumer
> > code:::
> >
> > [2012-04-16 14:26:15,385] ERROR error when processing request
> > FetchRequest(topic:itemTopic, part:0 offset:460201737 maxSize:307200)
> > (kafka.server.KafkaRequestHandlers)
> > kafka.common.OffsetOutOfRangeException: offset 460201737 is out of range
> >    at kafka.log.Log$.findRange(Log.scala:48)
> >    at kafka.log.Log.read(Log.scala:224)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
> >    at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >    at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
> >    at
> >
> >
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
> >    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
> >    at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> >    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> >    at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
> >    at kafka.network.Processor.handle(SocketServer.scala:289)
> >    at kafka.network.Processor.read(SocketServer.scala:312)
> >    at kafka.network.Processor.run(SocketServer.scala:207)
> >    at java.lang.Thread.run(Thread.java:662)
> >
> > and:::::::::::
> >
> > [2012-04-17 18:15:09,247] ERROR Closing socket for /127.0.0.1 because of
> > error (kafka.network.Processor)
> > java.io.IOException: Connection reset by peer
> >    at sun.nio.ch.FileDispatcher.read0(Native Method)
> >    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> >    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> >    at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> >    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
> >    at kafka.utils.Utils$.read(Utils.scala:485)
> >    at
> >
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >    at kafka.network.Processor.read(SocketServer.scala:304)
> >    at kafka.network.Processor.run(SocketServer.scala:207)
> >    at java.lang.Thread.run(Thread.java:662)
> >
> >
> >
> ------------------------------------------------------------------------------------
> > [2012-04-23 17:25:49,325] ERROR Closing socket for /127.0.0.1 because of
> > error (kafka.network.Processor)
> > java.io.IOException: Broken pipe
> >    at sun.nio.ch.FileDispatcher.write0(Native Method)
> >    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:29)
> >    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:69)
> >    at sun.nio.ch.IOUtil.write(IOUtil.java:40)
> >    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:334)
> >    at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:49)
> >    at kafka.network.MultiSend.writeTo(Transmission.scala:95)
> >    at kafka.network.Processor.write(SocketServer.scala:332)
> >    at kafka.network.Processor.run(SocketServer.scala:209)
> >    at java.lang.Thread.run(Thread.java:662)
> >
> >
> > What can be the reason of these exceptions? Can they be ignored or do i
> > need to change anything?
> >
> > Thanks,
> > Navneet Sharma
> >
>

Reply via email to