We've made some progress in our testing. While I do not have a good explanation for all the better behavior today, we have been able to move a substantial number of messages through the system today without any exceptions (> 800K messages).
The big things between last night's mess and today was: 1. I moved the Kafka log dir (the segment files) to a separate drive from the system drive), and 2. I rudeced the number of network and io threads back down to 2 each. We also found a (probably) unrelated bug where we were getting the broker 0 and broker 1 host name mappings swapped (something about Zookeeper returning children in any old order), so we weren't asking for topic offsets from the correct broker. The code worked fine when there was only one broker, but in a multi-broker cluster, we got bogus results. Thanks for all the help, Bob On Fri, Mar 22, 2013 at 11:27 AM, Bob Jervis <bjer...@gmail.com> wrote: > I'm also seeing in the midst of the chaos (our app is generating 15GB of > logs), the following event on one of our borkers: > > 2013-03-22 17:43:39,257 FATAL kafka.server.KafkaApis: [KafkaApi-1] Halting > due to unrecoverable I/O error while handling produce request: > kafka.common.KafkaStorageException: I/O exception in append to log > 'v1-english-8-0' > at kafka.log.Log.append(Log.scala:218) > at > kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:249) > at > kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:242) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > at > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:125) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:344) > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > at scala.collection.immutable.HashMap.map(HashMap.scala:35) > at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:242) > at > kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:182) > at kafka.server.KafkaApis.handle(KafkaApis.scala:59) > at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:41) > at java.lang.Thread.run(Thread.java:662) > Caused by: java.nio.channels.ClosedChannelException > at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:88) > at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:184) > at > kafka.message.ByteBufferMessageSet.writeTo(ByteBufferMessageSet.scala:128) > at kafka.log.FileMessageSet.append(FileMessageSet.scala:191) > at kafka.log.LogSegment.append(LogSegment.scala:64) > at kafka.log.Log.append(Log.scala:210) > ... 14 more > > > > On Fri, Mar 22, 2013 at 11:00 AM, Bob Jervis <bjer...@gmail.com> wrote: > >> I am getting the logs and I am trying to make sense of them. I see a >> 'Received Request' log entry that appears to be what is coming in from our >> app. I don't see any 'Completed Request' entries that correspond to those. >> The only completed entries I see for the logs in question are from the >> replica-fetcher. >> >> It is as if our app is asking the wrong broker and getting no answer, but >> for some reason reporting it as a socket timeout. >> >> Broker 0 is getting and completing TopicMetadata requests in about 600 >> milliseconds each. >> Broker 1 is not reporting ANY TopicMetadatRequests in the TRACE logs. >> >> Our app logs don't make any sense when I compare them to the broker logs >> and how can we be getting timeouts in less than 1000 milliseconds? >> >> Our app is reporting this: >> >> 2013-03-22 17:42:23,047 WARN kafka.producer.async.DefaultEventHandler: >> failed to send to broker 1 with data Map([v1-english-5,0] -> >> ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, >> crc = 2606857931, key = null, payload = java.nio.HeapByteBuffer[pos=0 >> lim=1700 cap=1700]),0), MessageAndOffset(Message(magic = 0, attributes = 0, >> crc = 735213417, key = null, payload = java.nio.HeapByteBuffer[pos=0 >> lim=1497 cap=1497]),1), MessageAndOffset(Message(magic = 0, attributes = 0, >> crc = 2435755724, key = null, payload = java.nio.HeapByteBuffer[pos=0 >> lim=1494 cap=1494]),2), MessageAndOffset(Message(magic = 0, attributes = 0, >> crc = 202370440, key = null, paylo..... >> java.net.SocketTimeoutException >> at >> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) >> at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) >> at >> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) >> at kafka.utils.Utils$.read(Utils.scala:372) >> at >> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >> at >> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >> at >> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >> at >> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >> at >> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) >> at >> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:71) >> at >> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:98) >> at >> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:98) >> at >> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:98) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at >> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:97) >> at >> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:97) >> at >> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:97) >> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >> at kafka.producer.SyncProducer.send(SyncProducer.scala:96) >> at >> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:221) >> at >> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:91) >> at >> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$1.apply(DefaultEventHandler.scala:85) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) >> at >> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) >> at scala.collection.Iterator$class.foreach(Iterator.scala:631) >> at >> scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) >> at >> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) >> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) >> at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) >> at >> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:85) >> at >> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:61) >> at kafka.producer.Producer.send(Producer.scala:76) >> at kafka.javaapi.producer.Producer.send(Producer.scala:41) >> at >> com.visibletechnologies.platform.common.kafka.KafkaWriter.flush(KafkaWriter.java:131) >> at >> com.visibletechnologies.platform.common.kafka.KafkaWriter.checkFlush(KafkaWriter.java:97) >> at >> com.visibletechnologies.platform.katta.krouter.KRouter.checkFlush(KRouter.java:182) >> at >> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:139) >> at >> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187) >> at >> com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132) >> 2013-03-22 17:42:23,157 INFO kafka.client.ClientUtils$: Fetching metadata >> for topic Set(v1-japanese-0, v1-indonesian-5, v1-french-3, v1-other-4, >> v1-portuguese-1, v1-other-1, v1-german-2, v1-english-15, v1-english-8, >> v1-portuguese-6, v1-arabic-0, v1-english-6, v1-spanish-5, v1-english-10, >> v1-japanese-4, v1-english-1, v1-italian-3, v1-spanish-1, v1-english-5, >> v1-other-7, v1-portuguese-2, v1-other-0, v1-indonesian-4, v1-english-9, >> v1-japanese-1, v1-spanish-2, v1-portuguese-7, v1-german-3, v1-arabic-1, >> v1-japanese-7, v1-korean-0, v1-spanish-4, v1-russian-0, v1-other-3, >> v1-japanese-3, v1-english-0, v1-french-1, v1-indonesian-0, v1-arabic-2, >> v1-english-4, v1-french-0, v1-indonesian-7, v1-english-12, v1-spanish-3, >> v1-japanese-6, v1-indonesian-3, v1-english-13, v1-other-6, v1-portuguese-3, >> v1-italian-1, v1-english-7, v1-german-0, v1-korean-3, v1-spanish-7, >> v1-russian-1, v1-indonesian-1, v1-portuguese-0, v1-indonesian-6, >> v1-french-2, v1-russian-2, v1-english-11, v1-japanese-2, v1-other-2, >> v1-indonesian-2, v1-english-14, v1-portuguese-5, v1-spanish-6, v1-german-1, >> v1-italian-0, v1-english-2, v1-japanese-5, v1-portuguese-4, >> v1-chinese-simplified-3, v1-spanish-0, v1-english-3, v1-arabic-3, >> v1-other-5) >> 2013-03-22 17:42:23,157 INFO kafka.producer.SyncProducer: Connected to >> kafka01.qa.viq:9092 for producing >> 2013-03-22 17:42:23,344 INFO kafka.producer.SyncProducer: Disconnecting >> from kafka01.qa.viq:9092 >> 2013-03-22 17:42:23,352 INFO kafka.producer.SyncProducer: Connected to >> 100.100.106.91:9092 for producing >> 2013-03-22 17:42:24,915 INFO kafka.producer.SyncProducer: Disconnecting >> from 100.100.106.91:9092 >> >> >> >> >> >> >> On Fri, Mar 22, 2013 at 10:06 AM, Jun Rao <jun...@gmail.com> wrote: >> >>> The metadata request is sent to the broker, which will read from ZK. I >>> suggest that you turn on trace level logging for class >>> kafka.network.RequestChannel$ in all brokers. The log will tell you how >>> long each metadata request takes on the broker. You can then set you socket >>> timeout in the producer accordingly. >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Fri, Mar 22, 2013 at 9:38 AM, Bob Jervis <bjer...@gmail.com> wrote: >>> >>>> I've tried this and it appears that we are still seeing the issue. >>>> Here is a stack trace of one of the socket timeout exceptions we are >>>> seeing (we converted to the SimpleConsumer): >>>> >>>> 2013-03-22 04:54:51,807 INFO kafka.client.ClientUtils$: Fetching >>>> metadata for topic Set(v1-japanese-0, v1-indonesian-5, v1-french-3, >>>> v1-other-4, v1-portuguese-1, v1-other-1, v1-german-2, v1-english-15, >>>> v1-english-8, v1-portuguese-6, v1-arabic-0, v1-english-6, v1-korean-1, >>>> v1-spanish-5, v1-english-10, v1-japanese-4, v1-english-1, v1-italian-3, >>>> v1-spanish-1, v1-english-5, v1-other-7, v1-portuguese-2, v1-other-0, >>>> v1-indonesian-4, v1-english-9, v1-japanese-1, v1-spanish-2, >>>> v1-portuguese-7, v1-german-3, v1-arabic-1, v1-japanese-7, v1-spanish-4, >>>> v1-other-3, v1-japanese-3, v1-english-0, v1-french-1, v1-indonesian-0, >>>> v1-arabic-2, v1-english-4, v1-french-0, v1-indonesian-7, v1-english-12, >>>> v1-spanish-3, v1-japanese-6, v1-indonesian-3, v1-english-13, v1-other-6, >>>> v1-portuguese-3, v1-italian-1, v1-english-7, v1-german-0, v1-korean-3, >>>> v1-spanish-7, v1-farsi-2, v1-russian-1, v1-indonesian-1, v1-portuguese-0, >>>> v1-indonesian-6, v1-french-2, v1-english-11, v1-japanese-2, v1-other-2, >>>> v1-indonesian-2, v1-english-14, v1-portuguese-5, v1-spanish-6, v1-german-1, >>>> v1-italian-0, v1-english-2, v1-japanese-5, v1-portuguese-4, v1-spanish-0, >>>> v1-english-3, v1-arabic-3, v1-other-5, v1-korean-2) >>>> 2013-03-22 04:54:51,808 INFO kafka.producer.SyncProducer: Connected to >>>> kafka01.qa.viq:9092 for producing >>>> 2013-03-22 04:54:51,983 INFO kafka.producer.SyncProducer: Disconnecting >>>> from kafka01.qa.viq:9092 >>>> 2013-03-22 04:54:51,987 INFO kafka.producer.SyncProducer: Connected to >>>> 100.100.106.91:9092 for producing >>>> 2013-03-22 14:41:42,650 INFO kafka.consumer.SimpleConsumer: Reconnect >>>> due to socket error: >>>> java.net.SocketTimeoutException >>>> at >>>> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201) >>>> at >>>> sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86) >>>> at >>>> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221) >>>> at kafka.utils.Utils$.read(Utils.scala:372) >>>> at >>>> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) >>>> at >>>> kafka.network.Receive$class.readCompletely(Transmission.scala:56) >>>> at >>>> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29) >>>> at >>>> kafka.network.BlockingChannel.receive(BlockingChannel.scala:100) >>>> at >>>> kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:124) >>>> at >>>> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:122) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:161) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:161) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:161) >>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:160) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:160) >>>> at >>>> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:160) >>>> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) >>>> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:159) >>>> at >>>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:48) >>>> at >>>> kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:58) >>>> at >>>> com.visibletechnologies.platform.common.kafka.KafkaReader.initializeIterator(KafkaReader.java:231) >>>> at >>>> com.visibletechnologies.platform.common.kafka.KafkaReader.read(KafkaReader.java:181) >>>> at >>>> com.visibletechnologies.platform.katta.krouter.KRouter.doWork(KRouter.java:117) >>>> at >>>> com.visibletechnologies.framework.servicebase.ServiceBase.start(ServiceBase.java:187) >>>> at >>>> com.visibletechnologies.platform.katta.krouter.Main.main(Main.java:132) >>>> >>>> I don't see any Zookeeper interactions here. It looks like fetch >>>> requests to the Kafka broker, but maybe I'm missing something. >>>> >>>> >>>> On Thu, Mar 21, 2013 at 9:16 PM, Jun Rao <jun...@gmail.com> wrote: >>>> >>>>> Bob, >>>>> >>>>> Currently, the metadata request needs to do at least one ZK read per >>>>> partition. So the more topics/partitions you have, the longer the request >>>>> takes. So, you need to increase the request timeout. Try something like 60 >>>>> * 1000 ms. >>>>> >>>>> Thanks, >>>>> >>>>> Jun >>>>> >>>>> On Thu, Mar 21, 2013 at 12:46 PM, Bob Jervis <bjer...@gmail.com>wrote: >>>>> >>>>>> We are seeing horrible problems. We cannot move data through our 0.8 >>>>>> borker because we are getting socket timeout exceptions and I cannot >>>>>> figure >>>>>> out what settings should be. The fetch metadata stuff is throwing >>>>>> these >>>>>> exceptions and no matter how I tweak the timeouts, I still get >>>>>> horrible >>>>>> timeouts and no progress on moving data. >>>>>> >>>>>> On test environments where there are only 12 topics there are no >>>>>> problems. >>>>>> >>>>>> When the number of topics goes to ~75, then we can't move anything >>>>>> because >>>>>> the fetch metadata requests time out. >>>>>> >>>>>> What can we do to fix this????????? >>>>>> >>>>>> I am desperate. >>>>>> >>>>> >>>>> >>>> >>> >> >