Jun: I have locate the problem. It was cause by cisio router. In high load scenario, our cisio router(2960s) will drop some packages by its low ability. So socket.recv() should be fine, we just need to solve the log directory corrupted by topic name with null byte in this scenario.
Jian Fan 2012/8/7 Jun Rao <jun...@gmail.com> > Thanks for the pointer to the paper. However, the socket buffer overflow > issue mentioned in the paper seems to be a performance issue, not a > correctness issue. That is, whatever bytes socket.recv() get should not be > corrupted. Is this not true? > > Jun > > On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xiaofanhad...@gmail.com> wrote: > > > The exception reason may be tcp buffer overflow, pls check the paper > > > http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf > > > > Thanks! > > > > 2012/8/2 jjian fan <xiaofanhad...@gmail.com> > > > > > Jun: > > > > > > How about the server power of the broker, you can deploy more > > producer > > > clients to increase the borker pressure. In my test, we send 300 > thousand > > > messages per second to the broker, the message size is 1024. In this > > > scenario, these exceptions are often be seen. > > > > > > Thanks! > > > Jian Fan > > > > > > 2012/8/1 Jun Rao <jun...@gmail.com> > > > > > >> Jian, > > >> > > >> The message format is documented in the Message class and has the > > >> following > > >> format. > > >> > > >> /** > > >> * A message. The format of an N byte message is the following: > > >> * > > >> * If magic byte is 0 > > >> * > > >> * 1. 1 byte "magic" identifier to allow format changes > > >> * > > >> * 2. 4 byte CRC32 of the payload > > >> * > > >> * 3. N - 5 byte payload > > >> * > > >> * If magic byte is 1 > > >> * > > >> * 1. 1 byte "magic" identifier to allow format changes > > >> * > > >> * 2. 1 byte "attributes" identifier to allow annotations on the > message > > >> independent of the version (e.g. compression enabled, type of codec > > used) > > >> * > > >> * 3. 4 byte CRC32 of the payload > > >> * > > >> * 4. N - 6 byte payload > > >> * > > >> */ > > >> > > >> The flow is the following: > > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and > sends > > >> the bytes to socket. > > >> 2. On the server side: > > >> 2.1 Processor.read reads the bytes off socket and deserializes the > bytes > > >> into a MultiProduceRequest > > >> 2.2 The request is then handled in KafkaRequestHandler > > >> > > >> BTW, I ran your test for a couple of days, but couldn't reproduce the > > >> exception. In your test, how frequently do you see the exceptions? > > >> > > >> Thanks, > > >> > > >> Jun > > >> > > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhad...@gmail.com> > > >> wrote: > > >> > > >> > Jun: > > >> > > > >> > Can you give more detail of the bytebuffer structure of messages, > > and > > >> > the process of sending and receiving the messages? > > >> > > > >> > Thanks > > >> > > > >> > Jian Fan > > >> > > > >> > > > >> > 2012/7/31 Jun Rao <jun...@gmail.com> > > >> > > > >> > > Jian, > > >> > > > > >> > > Thanks for the patch. It may not be the right fix though since it > > >> fixes > > >> > > the symptom, but not the cause. For each produce request, the > broker > > >> does > > >> > > the following: (1) read all bytes of the request into > > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all > bytes > > of > > >> > the > > >> > > request are ready, deserialize the bytes into a ProducerRequest > > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve > the > > >> > request > > >> > > by adding topic data to logs. > > >> > > > > >> > > What you observed is that in step 3, a topic name is corrupted > > >> somehow. > > >> > > However, this means that the corresponding ProducerRequest is > > >> corrupted. > > >> > > Assuming there is no corruption at the network layer (very > > unlikely), > > >> the > > >> > > corruption much have happened in step 1 or step 2. So, instead of > > >> > patching > > >> > > a corrupted topic name, we should understand why a ProducerRequest > > >> can be > > >> > > corrupted and fix the cause. BTW, what's caused the corrupted > topic > > >> could > > >> > > be causing the corrupted messages too. > > >> > > > > >> > > Thanks, > > >> > > > > >> > > Jun > > >> > > > > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan < > xiaofanhad...@gmail.com > > > > > >> > > wrote: > > >> > > > > >> > > > Jun: > > >> > > > > > >> > > > Hi. I find why the error appear. In high cocurrent > environment, > > >> the > > >> > tcp > > >> > > > server will drop some package when the tcp buffer is over. So > > there > > >> are > > >> > > > some chances that "topic" contains one or more characters that > > >> encode > > >> > to > > >> > > > bytes that include NULL (0). > > >> > > > I have submit the patch to kafka-411, pls check that! > > >> > > > > > >> > > > Thanks! > > >> > > > Jian Fan > > >> > > > > > >> > > > 2012/7/30 Jun Rao <jun...@gmail.com> > > >> > > > > > >> > > > > Jian, > > >> > > > > > > >> > > > > All log directories in kafka are created by > > >> LogManager.createLog(). > > >> > As > > >> > > > you > > >> > > > > can see, the directory always has the form of > topic-partitionId. > > >> So, > > >> > > it's > > >> > > > > not clear how a directory of "a" can be created in your case. > I > > >> will > > >> > > try > > >> > > > to > > >> > > > > rerun your test and see if it can be reproduced. > > >> > > > > > > >> > > > > Thanks, > > >> > > > > > > >> > > > > Jun > > >> > > > > > > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan < > > >> xiaofanhad...@gmail.com> > > >> > > > > wrote: > > >> > > > > > > >> > > > > > Jay: > > >> > > > > > > > >> > > > > > You can try to send 600 thousand message per second to > the > > >> > broker, > > >> > > > you > > >> > > > > > can find the tcp will drop packages, so sometimes the topic > of > > >> ax > > >> > > will > > >> > > > be > > >> > > > > > a. I don't mean to slove the tcp problem from application > > >> level, I > > >> > > just > > >> > > > > > find there are myabe a bug in file.mkdir() of > > >> LogManager.createlog. > > >> > > It > > >> > > > > will > > >> > > > > > infect the kafka useage. > > >> > > > > > > > >> > > > > > Thanks > > >> > > > > > Jian Fan > > >> > > > > > > > >> > > > > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com> > > >> > > > > > > > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a > reliable > > >> > > protocol > > >> > > > so > > >> > > > > > it > > >> > > > > > > is supposed to either deliver packets in order or timeout > > >> > retrying. > > >> > > > In > > >> > > > > > the > > >> > > > > > > case of the topic name, that is a size-delimited string, > > there > > >> > > should > > >> > > > > be > > >> > > > > > no > > >> > > > > > > way for it to drop a single byte in the middle of the > > request > > >> > like > > >> > > > > that. > > >> > > > > > If > > >> > > > > > > that is in fact happening, I don't think it is something > we > > >> can > > >> > > hope > > >> > > > to > > >> > > > > > > recover from at the application level... > > >> > > > > > > > > >> > > > > > > -Jay > > >> > > > > > > > > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan < > > >> > > xiaofanhad...@gmail.com> > > >> > > > > > > wrote: > > >> > > > > > > > > >> > > > > > > > Jun: > > >> > > > > > > > Dropping packages in TCP is an issue of OS/JVM, but > it > > >> can > > >> > > also > > >> > > > > > cause > > >> > > > > > > > some kafka issue! > > >> > > > > > > > For example, the topic of the message is ax, but it > can > > >> > change > > >> > > > to > > >> > > > > a > > >> > > > > > in > > >> > > > > > > > broker because the some packages is drop, so the log > > >> directory > > >> > > > > > > > should be like a-0,a-1, a-2 and so on ,but > file.mkdir() > > >> > create > > >> > > > log > > >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of > > >> > > > > > > LogManager.createlog. > > >> > > > > > > > If you shutdown the broker and restart it. The the > > broker > > >> > will > > >> > > > > > report > > >> > > > > > > > the exception like this: > > >> > > > > > > > > > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' > > >> > > > (kafka.log.LogManager) > > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during > > >> > > > KafkaServerStable > > >> > > > > > > > startup. Prepare to shutdown > > >> > (kafka.server.KafkaServerStartable) > > >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String index > > out > > >> of > > >> > > > range: > > >> > > > > > -1 > > >> > > > > > > > at java.lang.String.substring(String.java:1949) > > >> > > > > > > > at > > kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > >> > > > > > > > at > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > >> > > > > > > > at > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > >> > > > > > > > at > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > >> > > > > > > > at > > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > >> > > > > > > > at kafka.log.LogManager.<init>(LogManager.scala:65) > > >> > > > > > > > at > > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > >> > > > > > > > at > > >> > > > > > > > > > >> > > > > > > > >> > > > > > >> > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > >> > > > > > > > at kafka.Kafka$.main(Kafka.scala:50) > > >> > > > > > > > at kafka.Kafka.main(Kafka.scala) > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com> > > >> > > > > > > > > > >> > > > > > > > > Jian, > > >> > > > > > > > > > > >> > > > > > > > > I am not sure if I understand this completely. > Dropping > > >> > > packages > > >> > > > in > > >> > > > > > TCP > > >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? > Is > > >> this > > >> > an > > >> > > > > issue > > >> > > > > > > in > > >> > > > > > > > > Kafka or OS/JVM? > > >> > > > > > > > > > > >> > > > > > > > > Thanks, > > >> > > > > > > > > > > >> > > > > > > > > Jun > > >> > > > > > > > > > > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan < > > >> > > > > xiaofanhad...@gmail.com> > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > >> > > > > > > > > > Jun: > > >> > > > > > > > > > Yes, if the socket server can't handle the package > > >> quickly, > > >> > > tcp > > >> > > > > > > > protocol > > >> > > > > > > > > > will drop some network package until the buffer is > > >> > overflow, > > >> > > > the > > >> > > > > > > > > corrupted > > >> > > > > > > > > > messages is also appear on this situtation! I run a > > >> > > systemtap > > >> > > > > > script > > >> > > > > > > > to > > >> > > > > > > > > > find the package droping ,also you can type " cat > > >> > > > > > /proc/net/sockstat" > > >> > > > > > > > to > > >> > > > > > > > > > see the tcp memory increase. I debug the whole > kafka > > >> > source > > >> > > > code > > >> > > > > > to > > >> > > > > > > > find > > >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog. > > >> > > > > > > > > > > > >> > > > > > > > > > JIan Fan > > >> > > > > > > > > > > > >> > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com> > > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks for the finding. Are you saying that this > > >> problem > > >> > is > > >> > > > > > caused > > >> > > > > > > by > > >> > > > > > > > > the > > >> > > > > > > > > > > buffering in Kafka socket server? How did you > figure > > >> that > > >> > > > out? > > >> > > > > Is > > >> > > > > > > > this > > >> > > > > > > > > > > problem exposed by the same test that caused the > > >> > corrupted > > >> > > > > > messages > > >> > > > > > > > in > > >> > > > > > > > > > the > > >> > > > > > > > > > > broker? > > >> > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > >> > > > > > > > > > > > > >> > > > > > > > > > > Jun > > >> > > > > > > > > > > > > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan < > > >> > > > > > > xiaofanhad...@gmail.com> > > >> > > > > > > > > > > wrote: > > >> > > > > > > > > > > > > >> > > > > > > > > > > > In high cocurrent environment, the tcp > server > > >> will > > >> > > drop > > >> > > > > > some > > >> > > > > > > > > > package > > >> > > > > > > > > > > > when the tcp buffer is over. Then > > >> LogManager.createlog > > >> > > will > > >> > > > > > > create > > >> > > > > > > > > some > > >> > > > > > > > > > > > no-exists topic log. But one thing is very > > strange, > > >> the > > >> > > log > > >> > > > > > > > directory > > >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but > > >> file.mkdir() > > >> > > > > create > > >> > > > > > > log > > >> > > > > > > > > > > > directory like a. Seems some bug in file.mkdir() > > of > > >> > > > > > > > > > LogManager.createlog. > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > the exception message is > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create directory > > >> > > > > > /data/kafka/axx-0 > > >> > > > > > > > > > > > (kafka.log.LogManager) > > >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error processing > > >> > > > > > > > MultiProducerRequest > > >> > > > > > > > > > on > > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers) > > >> > > > > > > > > > > > java.io.FileNotFoundException: > > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka > > >> > > > > > > > > > > > (Is a directory) > > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native Method) > > >> > > > > > > > > > > > at > > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > > >> > > > > > > > > > > > at > kafka.utils.Utils$.openChannel(Utils.scala:324) > > >> > > > > > > > > > > > at > > >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144) > > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116) > > >> > > > > > > > > > > > at > > >> kafka.log.LogManager.createLog(LogManager.scala:159) > > >> > > > > > > > > > > > at > > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > >> > > > > > > > > > > > at > > >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > >> > > > > > > > >> > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > >> > > > > > > > > > > > at > > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > >> > > > > > > > > > > > at > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > >> > > > > > > > > > > > at > > >> > kafka.network.Processor.handle(SocketServer.scala:296) > > >> > > > > > > > > > > > at > > >> kafka.network.Processor.read(SocketServer.scala:319) > > >> > > > > > > > > > > > at > > >> kafka.network.Processor.run(SocketServer.scala:214) > > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679) > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > >