Bob,

We fixed a bunch of bugs in the log layer recently. Are you running the
latest version of the code from the 0.8 branch ?

Thanks,
Neha


On Fri, Mar 22, 2013 at 11:27 AM, Bob Jervis <bjer...@gmail.com> wrote:

> I'm also seeing in the midst of the chaos (our app is generating 15GB of
> logs), the following event on one of our borkers:
>
> 2013-03-22 17:43:39,257 FATAL kafka.server.KafkaApis: [KafkaApi-1] Halting
> due to unrecoverable I/O error while handling produce request:
> kafka.common.KafkaStorageException: I/O exception in append to log
> 'v1-english-8-0'
>         at kafka.log.Log.append(Log.scala:218)
>         at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:249)
>         at
>
> kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:242)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>         at
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>         at
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344)
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>         at scala.collection.immutable.HashMap.map(HashMap.scala:35)
>         at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:242)
>         at
> kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:182)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
>         at
> kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41)
>         at java.lang.Thread.run(Thread.java:662)
> Caused by: java.nio.channels.ClosedChannelException
>         at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88)
>         at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184)
>         at
> kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:128)
>         at kafka.log.FileMessageSet.append(FileMessageSet.scala:191)
>         at kafka.log.LogSegment.append(LogSegment.scala:64)
>         at kafka.log.Log.append(Log.scala:210)
>         ... 14 more
>
>
>
> On Fri, Mar 22, 2013 at 11:00 AM, Bob Jervis <bjer...@gmail.com> wrote:
>
> > I am getting the logs and I am trying to make sense of them.  I see a
> > 'Received Request' log entry that appears to be what is coming in from
> our
> > app.  I don't see any 'Completed Request' entries that correspond to
> those.
> >  The only completed entries I see for the logs in question are from the
> > replica-fetcher.
> >
> > It is as if our app is asking the wrong broker and getting no answer, but
> > for some reason reporting it as a socket timeout.
> >
> > Broker 0 is getting and completing TopicMetadata requests in about 600
> > milliseconds each.
> > Broker 1 is not reporting ANY TopicMetadatRequests in the TRACE logs.
> >
> > Our app logs don't make any sense when I compare them to the broker logs
> > and how can we be getting timeouts in less than 1000 milliseconds?
> >
> > Our app is reporting this:
> >
> > 2013-03-22 17:42:23,047 WARN kafka.producer.async.DefaultEventHandler:
> > failed to send to broker 1 with data Map([v1-english-5,0] ->
> > ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0,
> > crc = 2606857931, key = null, payload = java.nio.HeapByteBuffer[pos=0
> > lim=1700 cap=1700]),0), MessageAndOffset(Message(magic = 0, attributes =
> 0,
> > crc = 735213417, key = null, payload = java.nio.HeapByteBuffer[pos=0
> > lim=1497 cap=1497]),1), MessageAndOffset(Message(magic = 0, attributes =
> 0,
> > crc = 2435755724, key = null, payload = java.nio.HeapByteBuffer[pos=0
> > lim=1494 cap=1494]),2), MessageAndOffset(Message(magic = 0, attributes =
> 0,
> > crc = 202370440, key = null, paylo.....
> > java.net.SocketTimeoutException
> >         at
> > sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> >         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >         at
> >
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> >         at kafka.utils.Utils$.read(Utils.scala:372)
> >         at
> >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >         at
> > kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >         at
> >
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >         at
> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >         at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
> >         at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71)
> >         at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:98)
> >         at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:98)
> >         at
> >
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:98)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at
> >
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:97)
> >         at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:97)
> >         at
> > kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:97)
> >         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:96)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:221)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:91)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:85)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> >         at
> >
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80)
> >         at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >         at
> > scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161)
> >         at
> >
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194)
> >         at
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> >         at scala.collection.mutable.HashMap.foreach(HashMap.scala:80)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:85)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:61)
> >         at kafka.producer.Producer.send(Producer.scala:76)
> >         at kafka.javaapi.producer.Producer.send(Producer.scala:41)
> >         at
> >
> com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:131)
> >         at
> >
> com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:97)
> >         at
> >
> com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182)
> >         at
> >
> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139)
> >         at
> >
> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187)
> >         at
> > com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)
> > 2013-03-22 17:42:23,157 INFO kafka.client.ClientUtils$: Fetching metadata
> > for topic Set(v1-japanese-0, v1-indonesian-5, v1-french-3, v1-other-4,
> > v1-portuguese-1, v1-other-1, v1-german-2, v1-english-15, v1-english-8,
> > v1-portuguese-6, v1-arabic-0, v1-english-6, v1-spanish-5, v1-english-10,
> > v1-japanese-4, v1-english-1, v1-italian-3, v1-spanish-1, v1-english-5,
> > v1-other-7, v1-portuguese-2, v1-other-0, v1-indonesian-4, v1-english-9,
> > v1-japanese-1, v1-spanish-2, v1-portuguese-7, v1-german-3, v1-arabic-1,
> > v1-japanese-7, v1-korean-0, v1-spanish-4, v1-russian-0, v1-other-3,
> > v1-japanese-3, v1-english-0, v1-french-1, v1-indonesian-0, v1-arabic-2,
> > v1-english-4, v1-french-0, v1-indonesian-7, v1-english-12, v1-spanish-3,
> > v1-japanese-6, v1-indonesian-3, v1-english-13, v1-other-6,
> v1-portuguese-3,
> > v1-italian-1, v1-english-7, v1-german-0, v1-korean-3, v1-spanish-7,
> > v1-russian-1, v1-indonesian-1, v1-portuguese-0, v1-indonesian-6,
> > v1-french-2, v1-russian-2, v1-english-11, v1-japanese-2, v1-other-2,
> > v1-indonesian-2, v1-english-14, v1-portuguese-5, v1-spanish-6,
> v1-german-1,
> > v1-italian-0, v1-english-2, v1-japanese-5, v1-portuguese-4,
> > v1-chinese-simplified-3, v1-spanish-0, v1-english-3, v1-arabic-3,
> > v1-other-5)
> > 2013-03-22 17:42:23,157 INFO kafka.producer.SyncProducer: Connected to
> > kafka01.qa.viq:9092 for producing
> > 2013-03-22 17:42:23,344 INFO kafka.producer.SyncProducer: Disconnecting
> > from kafka01.qa.viq:9092
> > 2013-03-22 17:42:23,352 INFO kafka.producer.SyncProducer: Connected to
> > 100.100.106.91:9092 for producing
> > 2013-03-22 17:42:24,915 INFO kafka.producer.SyncProducer: Disconnecting
> > from 100.100.106.91:9092
> >
> >
> >
> >
> >
> >
> > On Fri, Mar 22, 2013 at 10:06 AM, Jun Rao <jun...@gmail.com> wrote:
> >
> >> The metadata request is sent to the broker, which will read from ZK. I
> >> suggest that you turn on trace level logging for class
> >> kafka.network.RequestChannel$ in all brokers. The log will tell you how
> >> long each metadata request takes on the broker. You can then set you
> socket
> >> timeout in the producer accordingly.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Fri, Mar 22, 2013 at 9:38 AM, Bob Jervis <bjer...@gmail.com> wrote:
> >>
> >>> I've tried this and it appears that we are still seeing the issue.
>  Here
> >>> is a stack trace of one of the socket timeout exceptions we are seeing
> (we
> >>> converted to the SimpleConsumer):
> >>>
> >>> 2013-03-22 04:54:51,807 INFO kafka.client.ClientUtils$: Fetching
> >>> metadata for topic Set(v1-japanese-0, v1-indonesian-5, v1-french-3,
> >>> v1-other-4, v1-portuguese-1, v1-other-1, v1-german-2, v1-english-15,
> >>> v1-english-8, v1-portuguese-6, v1-arabic-0, v1-english-6, v1-korean-1,
> >>> v1-spanish-5, v1-english-10, v1-japanese-4, v1-english-1, v1-italian-3,
> >>> v1-spanish-1, v1-english-5, v1-other-7, v1-portuguese-2, v1-other-0,
> >>> v1-indonesian-4, v1-english-9, v1-japanese-1, v1-spanish-2,
> >>> v1-portuguese-7, v1-german-3, v1-arabic-1, v1-japanese-7, v1-spanish-4,
> >>> v1-other-3, v1-japanese-3, v1-english-0, v1-french-1, v1-indonesian-0,
> >>> v1-arabic-2, v1-english-4, v1-french-0, v1-indonesian-7, v1-english-12,
> >>> v1-spanish-3, v1-japanese-6, v1-indonesian-3, v1-english-13,
> v1-other-6,
> >>> v1-portuguese-3, v1-italian-1, v1-english-7, v1-german-0, v1-korean-3,
> >>> v1-spanish-7, v1-farsi-2, v1-russian-1, v1-indonesian-1,
> v1-portuguese-0,
> >>> v1-indonesian-6, v1-french-2, v1-english-11, v1-japanese-2, v1-other-2,
> >>> v1-indonesian-2, v1-english-14, v1-portuguese-5, v1-spanish-6,
> v1-german-1,
> >>> v1-italian-0, v1-english-2, v1-japanese-5, v1-portuguese-4,
> v1-spanish-0,
> >>> v1-english-3, v1-arabic-3, v1-other-5, v1-korean-2)
> >>> 2013-03-22 04:54:51,808 INFO kafka.producer.SyncProducer: Connected to
> >>> kafka01.qa.viq:9092 for producing
> >>> 2013-03-22 04:54:51,983 INFO kafka.producer.SyncProducer: Disconnecting
> >>> from kafka01.qa.viq:9092
> >>> 2013-03-22 04:54:51,987 INFO kafka.producer.SyncProducer: Connected to
> >>> 100.100.106.91:9092 for producing
> >>> 2013-03-22 14:41:42,650 INFO kafka.consumer.SimpleConsumer: Reconnect
> >>> due to socket error:
> >>> java.net.SocketTimeoutException
> >>>         at
> >>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
> >>>         at
> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
> >>>         at
> >>>
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
> >>>         at kafka.utils.Utils$.read(Utils.scala:372)
> >>>         at
> >>>
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> >>>         at
> >>> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> >>>         at
> >>>
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> >>>         at
> >>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
> >>>         at
> >>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:124)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:122)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:161)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:161)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:161)
> >>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:160)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:160)
> >>>         at
> >>>
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:160)
> >>>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> >>>         at
> kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:159)
> >>>         at
> >>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:48)
> >>>         at
> >>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:58)
> >>>         at
> >>>
> com.visibletechnologies.platform.common.kafka.KafkaReader.initializeIterator(KafkaReader.java:231)
> >>>         at
> >>>
> com.visibletechnologies.platform.common.kafka.KafkaReader.read(KafkaReader.java:181)
> >>>         at
> >>>
> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:117)
> >>>         at
> >>>
> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187)
> >>>         at
> >>> com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132)
> >>>
> >>> I don't see any Zookeeper interactions here.  It looks like fetch
> >>> requests to the Kafka broker, but maybe I'm missing something.
> >>>
> >>>
> >>> On Thu, Mar 21, 2013 at 9:16 PM, Jun Rao <jun...@gmail.com> wrote:
> >>>
> >>>> Bob,
> >>>>
> >>>> Currently, the metadata request needs to do at least one ZK read per
> >>>> partition. So the more topics/partitions you have, the longer the
> request
> >>>> takes. So, you need to increase the request timeout. Try something
> like 60
> >>>> * 1000 ms.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>> On Thu, Mar 21, 2013 at 12:46 PM, Bob Jervis <bjer...@gmail.com>
> wrote:
> >>>>
> >>>>> We are seeing horrible problems.  We cannot move data through our 0.8
> >>>>> borker because we are getting socket timeout exceptions and I cannot
> >>>>> figure
> >>>>> out what settings should be.  The fetch metadata stuff is throwing
> >>>>> these
> >>>>> exceptions and no matter how I tweak the timeouts, I still get
> horrible
> >>>>> timeouts and no progress on moving data.
> >>>>>
> >>>>> On test environments where there are only 12 topics there are no
> >>>>> problems.
> >>>>>
> >>>>> When the number of topics goes to ~75, then we can't move anything
> >>>>> because
> >>>>> the fetch metadata requests time out.
> >>>>>
> >>>>> What can we do to fix this?????????
> >>>>>
> >>>>> I am desperate.
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>

Reply via email to