The exception reason may be tcp buffer overflow, pls check the paper http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf
Thanks! 2012/8/2 jjian fan <xiaofanhad...@gmail.com> > Jun: > > How about the server power of the broker, you can deploy more producer > clients to increase the borker pressure. In my test, we send 300 thousand > messages per second to the broker, the message size is 1024. In this > scenario, these exceptions are often be seen. > > Thanks! > Jian Fan > > 2012/8/1 Jun Rao <jun...@gmail.com> > >> Jian, >> >> The message format is documented in the Message class and has the >> following >> format. >> >> /** >> * A message. The format of an N byte message is the following: >> * >> * If magic byte is 0 >> * >> * 1. 1 byte "magic" identifier to allow format changes >> * >> * 2. 4 byte CRC32 of the payload >> * >> * 3. N - 5 byte payload >> * >> * If magic byte is 1 >> * >> * 1. 1 byte "magic" identifier to allow format changes >> * >> * 2. 1 byte "attributes" identifier to allow annotations on the message >> independent of the version (e.g. compression enabled, type of codec used) >> * >> * 3. 4 byte CRC32 of the payload >> * >> * 4. N - 6 byte payload >> * >> */ >> >> The flow is the following: >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and sends >> the bytes to socket. >> 2. On the server side: >> 2.1 Processor.read reads the bytes off socket and deserializes the bytes >> into a MultiProduceRequest >> 2.2 The request is then handled in KafkaRequestHandler >> >> BTW, I ran your test for a couple of days, but couldn't reproduce the >> exception. In your test, how frequently do you see the exceptions? >> >> Thanks, >> >> Jun >> >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhad...@gmail.com> >> wrote: >> >> > Jun: >> > >> > Can you give more detail of the bytebuffer structure of messages, and >> > the process of sending and receiving the messages? >> > >> > Thanks >> > >> > Jian Fan >> > >> > >> > 2012/7/31 Jun Rao <jun...@gmail.com> >> > >> > > Jian, >> > > >> > > Thanks for the patch. It may not be the right fix though since it >> fixes >> > > the symptom, but not the cause. For each produce request, the broker >> does >> > > the following: (1) read all bytes of the request into >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all bytes of >> > the >> > > request are ready, deserialize the bytes into a ProducerRequest >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve the >> > request >> > > by adding topic data to logs. >> > > >> > > What you observed is that in step 3, a topic name is corrupted >> somehow. >> > > However, this means that the corresponding ProducerRequest is >> corrupted. >> > > Assuming there is no corruption at the network layer (very unlikely), >> the >> > > corruption much have happened in step 1 or step 2. So, instead of >> > patching >> > > a corrupted topic name, we should understand why a ProducerRequest >> can be >> > > corrupted and fix the cause. BTW, what's caused the corrupted topic >> could >> > > be causing the corrupted messages too. >> > > >> > > Thanks, >> > > >> > > Jun >> > > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan <xiaofanhad...@gmail.com> >> > > wrote: >> > > >> > > > Jun: >> > > > >> > > > Hi. I find why the error appear. In high cocurrent environment, >> the >> > tcp >> > > > server will drop some package when the tcp buffer is over. So there >> are >> > > > some chances that "topic" contains one or more characters that >> encode >> > to >> > > > bytes that include NULL (0). >> > > > I have submit the patch to kafka-411, pls check that! >> > > > >> > > > Thanks! >> > > > Jian Fan >> > > > >> > > > 2012/7/30 Jun Rao <jun...@gmail.com> >> > > > >> > > > > Jian, >> > > > > >> > > > > All log directories in kafka are created by >> LogManager.createLog(). >> > As >> > > > you >> > > > > can see, the directory always has the form of topic-partitionId. >> So, >> > > it's >> > > > > not clear how a directory of "a" can be created in your case. I >> will >> > > try >> > > > to >> > > > > rerun your test and see if it can be reproduced. >> > > > > >> > > > > Thanks, >> > > > > >> > > > > Jun >> > > > > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan < >> xiaofanhad...@gmail.com> >> > > > > wrote: >> > > > > >> > > > > > Jay: >> > > > > > >> > > > > > You can try to send 600 thousand message per second to the >> > broker, >> > > > you >> > > > > > can find the tcp will drop packages, so sometimes the topic of >> ax >> > > will >> > > > be >> > > > > > a. I don't mean to slove the tcp problem from application >> level, I >> > > just >> > > > > > find there are myabe a bug in file.mkdir() of >> LogManager.createlog. >> > > It >> > > > > will >> > > > > > infect the kafka useage. >> > > > > > >> > > > > > Thanks >> > > > > > Jian Fan >> > > > > > >> > > > > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com> >> > > > > > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a reliable >> > > protocol >> > > > so >> > > > > > it >> > > > > > > is supposed to either deliver packets in order or timeout >> > retrying. >> > > > In >> > > > > > the >> > > > > > > case of the topic name, that is a size-delimited string, there >> > > should >> > > > > be >> > > > > > no >> > > > > > > way for it to drop a single byte in the middle of the request >> > like >> > > > > that. >> > > > > > If >> > > > > > > that is in fact happening, I don't think it is something we >> can >> > > hope >> > > > to >> > > > > > > recover from at the application level... >> > > > > > > >> > > > > > > -Jay >> > > > > > > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan < >> > > xiaofanhad...@gmail.com> >> > > > > > > wrote: >> > > > > > > >> > > > > > > > Jun: >> > > > > > > > Dropping packages in TCP is an issue of OS/JVM, but it >> can >> > > also >> > > > > > cause >> > > > > > > > some kafka issue! >> > > > > > > > For example, the topic of the message is ax, but it can >> > change >> > > > to >> > > > > a >> > > > > > in >> > > > > > > > broker because the some packages is drop, so the log >> directory >> > > > > > > > should be like a-0,a-1, a-2 and so on ,but file.mkdir() >> > create >> > > > log >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of >> > > > > > > LogManager.createlog. >> > > > > > > > If you shutdown the broker and restart it. The the broker >> > will >> > > > > > report >> > > > > > > > the exception like this: >> > > > > > > > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' >> > > > (kafka.log.LogManager) >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during >> > > > KafkaServerStable >> > > > > > > > startup. Prepare to shutdown >> > (kafka.server.KafkaServerStartable) >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index out >> of >> > > > range: >> > > > > > -1 >> > > > > > > > at java.lang.String.substring(String.java:1949) >> > > > > > > > at kafka.utils.Utils$.getTopicPartition(Utils.scala:558) >> > > > > > > > at >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) >> > > > > > > > at >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) >> > > > > > > > at >> > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >> > > > > > > > at >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >> > > > > > > > at kafka.log.LogManager.<init>(LogManager.scala:65) >> > > > > > > > at >> kafka.server.KafkaServer.startup(KafkaServer.scala:58) >> > > > > > > > at >> > > > > > > > >> > > > > > >> > > > >> > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) >> > > > > > > > at kafka.Kafka$.main(Kafka.scala:50) >> > > > > > > > at kafka.Kafka.main(Kafka.scala) >> > > > > > > > >> > > > > > > > >> > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com> >> > > > > > > > >> > > > > > > > > Jian, >> > > > > > > > > >> > > > > > > > > I am not sure if I understand this completely. Dropping >> > > packages >> > > > in >> > > > > > TCP >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? Is >> this >> > an >> > > > > issue >> > > > > > > in >> > > > > > > > > Kafka or OS/JVM? >> > > > > > > > > >> > > > > > > > > Thanks, >> > > > > > > > > >> > > > > > > > > Jun >> > > > > > > > > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan < >> > > > > xiaofanhad...@gmail.com> >> > > > > > > > > wrote: >> > > > > > > > > >> > > > > > > > > > Jun: >> > > > > > > > > > Yes, if the socket server can't handle the package >> quickly, >> > > tcp >> > > > > > > > protocol >> > > > > > > > > > will drop some network package until the buffer is >> > overflow, >> > > > the >> > > > > > > > > corrupted >> > > > > > > > > > messages is also appear on this situtation! I run a >> > > systemtap >> > > > > > script >> > > > > > > > to >> > > > > > > > > > find the package droping ,also you can type " cat >> > > > > > /proc/net/sockstat" >> > > > > > > > to >> > > > > > > > > > see the tcp memory increase. I debug the whole kafka >> > source >> > > > code >> > > > > > to >> > > > > > > > find >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog. >> > > > > > > > > > >> > > > > > > > > > JIan Fan >> > > > > > > > > > >> > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com> >> > > > > > > > > > >> > > > > > > > > > > Thanks for the finding. Are you saying that this >> problem >> > is >> > > > > > caused >> > > > > > > by >> > > > > > > > > the >> > > > > > > > > > > buffering in Kafka socket server? How did you figure >> that >> > > > out? >> > > > > Is >> > > > > > > > this >> > > > > > > > > > > problem exposed by the same test that caused the >> > corrupted >> > > > > > messages >> > > > > > > > in >> > > > > > > > > > the >> > > > > > > > > > > broker? >> > > > > > > > > > > >> > > > > > > > > > > Thanks, >> > > > > > > > > > > >> > > > > > > > > > > Jun >> > > > > > > > > > > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan < >> > > > > > > xiaofanhad...@gmail.com> >> > > > > > > > > > > wrote: >> > > > > > > > > > > >> > > > > > > > > > > > In high cocurrent environment, the tcp server >> will >> > > drop >> > > > > > some >> > > > > > > > > > package >> > > > > > > > > > > > when the tcp buffer is over. Then >> LogManager.createlog >> > > will >> > > > > > > create >> > > > > > > > > some >> > > > > > > > > > > > no-exists topic log. But one thing is very strange, >> the >> > > log >> > > > > > > > directory >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but >> file.mkdir() >> > > > > create >> > > > > > > log >> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() of >> > > > > > > > > > LogManager.createlog. >> > > > > > > > > > > > >> > > > > > > > > > > > the exception message is >> > > > > > > > > > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory >> > > > > > /data/kafka/axx-0 >> > > > > > > > > > > > (kafka.log.LogManager) >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing >> > > > > > > > MultiProducerRequest >> > > > > > > > > > on >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers) >> > > > > > > > > > > > java.io.FileNotFoundException: >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka >> > > > > > > > > > > > (Is a directory) >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method) >> > > > > > > > > > > > at >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) >> > > > > > > > > > > > at kafka.utils.Utils$.openChannel(Utils.scala:324) >> > > > > > > > > > > > at >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144) >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116) >> > > > > > > > > > > > at >> kafka.log.LogManager.createLog(LogManager.scala:159) >> > > > > > > > > > > > at >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) >> > > > > > > > > > > > at >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) >> > > > > > > > > > > > at >> > > > > > > > > > >> > > > > > >> > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) >> > > > > > > > > > > > at >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > > > > > > > > > > > at >> > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) >> > > > > > > > > > > > at >> > kafka.network.Processor.handle(SocketServer.scala:296) >> > > > > > > > > > > > at >> kafka.network.Processor.read(SocketServer.scala:319) >> > > > > > > > > > > > at >> kafka.network.Processor.run(SocketServer.scala:214) >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679) >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > >> > >