Thanks for the update. Do you still see the corrupted topic name issue after the router issue is fixed?
Thanks, Jun On Wed, Aug 8, 2012 at 9:03 PM, jjian fan <xiaofanhad...@gmail.com> wrote: > Jun: > > I have locate the problem. It was cause by cisio router. In high load > scenario, our cisio router(2960s) will drop some packages by its low > ability. So socket.recv() should be fine, we just need to solve the log > directory corrupted by topic name with null byte in this scenario. > > Jian Fan > > 2012/8/7 Jun Rao <jun...@gmail.com> > > > Thanks for the pointer to the paper. However, the socket buffer overflow > > issue mentioned in the paper seems to be a performance issue, not a > > correctness issue. That is, whatever bytes socket.recv() get should not > be > > corrupted. Is this not true? > > > > Jun > > > > On Fri, Aug 3, 2012 at 6:54 AM, jjian fan <xiaofanhad...@gmail.com> > wrote: > > > > > The exception reason may be tcp buffer overflow, pls check the paper > > > > > > http://os.korea.ac.kr/publication_papers/inter_journal/jhchoi_cn_2007.pdf > > > > > > Thanks! > > > > > > 2012/8/2 jjian fan <xiaofanhad...@gmail.com> > > > > > > > Jun: > > > > > > > > How about the server power of the broker, you can deploy more > > > producer > > > > clients to increase the borker pressure. In my test, we send 300 > > thousand > > > > messages per second to the broker, the message size is 1024. In this > > > > scenario, these exceptions are often be seen. > > > > > > > > Thanks! > > > > Jian Fan > > > > > > > > 2012/8/1 Jun Rao <jun...@gmail.com> > > > > > > > >> Jian, > > > >> > > > >> The message format is documented in the Message class and has the > > > >> following > > > >> format. > > > >> > > > >> /** > > > >> * A message. The format of an N byte message is the following: > > > >> * > > > >> * If magic byte is 0 > > > >> * > > > >> * 1. 1 byte "magic" identifier to allow format changes > > > >> * > > > >> * 2. 4 byte CRC32 of the payload > > > >> * > > > >> * 3. N - 5 byte payload > > > >> * > > > >> * If magic byte is 1 > > > >> * > > > >> * 1. 1 byte "magic" identifier to allow format changes > > > >> * > > > >> * 2. 1 byte "attributes" identifier to allow annotations on the > > message > > > >> independent of the version (e.g. compression enabled, type of codec > > > used) > > > >> * > > > >> * 3. 4 byte CRC32 of the payload > > > >> * > > > >> * 4. N - 6 byte payload > > > >> * > > > >> */ > > > >> > > > >> The flow is the following: > > > >> 1. SyncProducer.send serializes a MultiProduceRequest to bytes and > > sends > > > >> the bytes to socket. > > > >> 2. On the server side: > > > >> 2.1 Processor.read reads the bytes off socket and deserializes the > > bytes > > > >> into a MultiProduceRequest > > > >> 2.2 The request is then handled in KafkaRequestHandler > > > >> > > > >> BTW, I ran your test for a couple of days, but couldn't reproduce > the > > > >> exception. In your test, how frequently do you see the exceptions? > > > >> > > > >> Thanks, > > > >> > > > >> Jun > > > >> > > > >> On Wed, Aug 1, 2012 at 6:43 AM, jjian fan <xiaofanhad...@gmail.com> > > > >> wrote: > > > >> > > > >> > Jun: > > > >> > > > > >> > Can you give more detail of the bytebuffer structure of > messages, > > > and > > > >> > the process of sending and receiving the messages? > > > >> > > > > >> > Thanks > > > >> > > > > >> > Jian Fan > > > >> > > > > >> > > > > >> > 2012/7/31 Jun Rao <jun...@gmail.com> > > > >> > > > > >> > > Jian, > > > >> > > > > > >> > > Thanks for the patch. It may not be the right fix though since > it > > > >> fixes > > > >> > > the symptom, but not the cause. For each produce request, the > > broker > > > >> does > > > >> > > the following: (1) read all bytes of the request into > > > >> > > a BoundedByteBufferReceive (SocketServer.read); (2) after all > > bytes > > > of > > > >> > the > > > >> > > request are ready, deserialize the bytes into a ProducerRequest > > > >> > > (KafkaRequestHandler.handleProducerRequest); (3) finally, serve > > the > > > >> > request > > > >> > > by adding topic data to logs. > > > >> > > > > > >> > > What you observed is that in step 3, a topic name is corrupted > > > >> somehow. > > > >> > > However, this means that the corresponding ProducerRequest is > > > >> corrupted. > > > >> > > Assuming there is no corruption at the network layer (very > > > unlikely), > > > >> the > > > >> > > corruption much have happened in step 1 or step 2. So, instead > of > > > >> > patching > > > >> > > a corrupted topic name, we should understand why a > ProducerRequest > > > >> can be > > > >> > > corrupted and fix the cause. BTW, what's caused the corrupted > > topic > > > >> could > > > >> > > be causing the corrupted messages too. > > > >> > > > > > >> > > Thanks, > > > >> > > > > > >> > > Jun > > > >> > > > > > >> > > On Mon, Jul 30, 2012 at 2:18 AM, jjian fan < > > xiaofanhad...@gmail.com > > > > > > > >> > > wrote: > > > >> > > > > > >> > > > Jun: > > > >> > > > > > > >> > > > Hi. I find why the error appear. In high cocurrent > > environment, > > > >> the > > > >> > tcp > > > >> > > > server will drop some package when the tcp buffer is over. So > > > there > > > >> are > > > >> > > > some chances that "topic" contains one or more characters that > > > >> encode > > > >> > to > > > >> > > > bytes that include NULL (0). > > > >> > > > I have submit the patch to kafka-411, pls check that! > > > >> > > > > > > >> > > > Thanks! > > > >> > > > Jian Fan > > > >> > > > > > > >> > > > 2012/7/30 Jun Rao <jun...@gmail.com> > > > >> > > > > > > >> > > > > Jian, > > > >> > > > > > > > >> > > > > All log directories in kafka are created by > > > >> LogManager.createLog(). > > > >> > As > > > >> > > > you > > > >> > > > > can see, the directory always has the form of > > topic-partitionId. > > > >> So, > > > >> > > it's > > > >> > > > > not clear how a directory of "a" can be created in your > case. > > I > > > >> will > > > >> > > try > > > >> > > > to > > > >> > > > > rerun your test and see if it can be reproduced. > > > >> > > > > > > > >> > > > > Thanks, > > > >> > > > > > > > >> > > > > Jun > > > >> > > > > > > > >> > > > > On Sat, Jul 28, 2012 at 7:35 PM, jjian fan < > > > >> xiaofanhad...@gmail.com> > > > >> > > > > wrote: > > > >> > > > > > > > >> > > > > > Jay: > > > >> > > > > > > > > >> > > > > > You can try to send 600 thousand message per second to > > the > > > >> > broker, > > > >> > > > you > > > >> > > > > > can find the tcp will drop packages, so sometimes the > topic > > of > > > >> ax > > > >> > > will > > > >> > > > be > > > >> > > > > > a. I don't mean to slove the tcp problem from application > > > >> level, I > > > >> > > just > > > >> > > > > > find there are myabe a bug in file.mkdir() of > > > >> LogManager.createlog. > > > >> > > It > > > >> > > > > will > > > >> > > > > > infect the kafka useage. > > > >> > > > > > > > > >> > > > > > Thanks > > > >> > > > > > Jian Fan > > > >> > > > > > > > > >> > > > > > 2012/7/29 Jay Kreps <jay.kr...@gmail.com> > > > >> > > > > > > > > >> > > > > > > Hmm, that is not my understanding of TCP. TCP is a > > reliable > > > >> > > protocol > > > >> > > > so > > > >> > > > > > it > > > >> > > > > > > is supposed to either deliver packets in order or > timeout > > > >> > retrying. > > > >> > > > In > > > >> > > > > > the > > > >> > > > > > > case of the topic name, that is a size-delimited string, > > > there > > > >> > > should > > > >> > > > > be > > > >> > > > > > no > > > >> > > > > > > way for it to drop a single byte in the middle of the > > > request > > > >> > like > > > >> > > > > that. > > > >> > > > > > If > > > >> > > > > > > that is in fact happening, I don't think it is something > > we > > > >> can > > > >> > > hope > > > >> > > > to > > > >> > > > > > > recover from at the application level... > > > >> > > > > > > > > > >> > > > > > > -Jay > > > >> > > > > > > > > > >> > > > > > > On Fri, Jul 27, 2012 at 9:45 PM, jjian fan < > > > >> > > xiaofanhad...@gmail.com> > > > >> > > > > > > wrote: > > > >> > > > > > > > > > >> > > > > > > > Jun: > > > >> > > > > > > > Dropping packages in TCP is an issue of OS/JVM, but > > it > > > >> can > > > >> > > also > > > >> > > > > > cause > > > >> > > > > > > > some kafka issue! > > > >> > > > > > > > For example, the topic of the message is ax, but it > > can > > > >> > change > > > >> > > > to > > > >> > > > > a > > > >> > > > > > in > > > >> > > > > > > > broker because the some packages is drop, so the log > > > >> directory > > > >> > > > > > > > should be like a-0,a-1, a-2 and so on ,but > > file.mkdir() > > > >> > create > > > >> > > > log > > > >> > > > > > > > directory like a. Seems some bugs in file.mkdir() of > > > >> > > > > > > LogManager.createlog. > > > >> > > > > > > > If you shutdown the broker and restart it. The the > > > broker > > > >> > will > > > >> > > > > > report > > > >> > > > > > > > the exception like this: > > > >> > > > > > > > > > > >> > > > > > > > [2012-07-28 12:43:44,565] INFO Loading log 'a' > > > >> > > > (kafka.log.LogManager) > > > >> > > > > > > > [2012-07-28 12:43:44,574] FATAL Fatal error during > > > >> > > > KafkaServerStable > > > >> > > > > > > > startup. Prepare to shutdown > > > >> > (kafka.server.KafkaServerStartable) > > > >> > > > > > > > java.lang.StringIndexOutOfBoundsException: String > index > > > out > > > >> of > > > >> > > > range: > > > >> > > > > > -1 > > > >> > > > > > > > at java.lang.String.substring(String.java:1949) > > > >> > > > > > > > at > > > kafka.utils.Utils$.getTopicPartition(Utils.scala:558) > > > >> > > > > > > > at > > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:71) > > > >> > > > > > > > at > > > >> > kafka.log.LogManager$$anonfun$4.apply(LogManager.scala:65) > > > >> > > > > > > > at > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > >> > > > > > > > at > > > >> > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > >> > > > > > > > at > kafka.log.LogManager.<init>(LogManager.scala:65) > > > >> > > > > > > > at > > > >> kafka.server.KafkaServer.startup(KafkaServer.scala:58) > > > >> > > > > > > > at > > > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > > >> > > > > > kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:34) > > > >> > > > > > > > at kafka.Kafka$.main(Kafka.scala:50) > > > >> > > > > > > > at kafka.Kafka.main(Kafka.scala) > > > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > 2012/7/28 Jun Rao <jun...@gmail.com> > > > >> > > > > > > > > > > >> > > > > > > > > Jian, > > > >> > > > > > > > > > > > >> > > > > > > > > I am not sure if I understand this completely. > > Dropping > > > >> > > packages > > > >> > > > in > > > >> > > > > > TCP > > > >> > > > > > > > > shouldn't cause corruption in the TCP buffer, right? > > Is > > > >> this > > > >> > an > > > >> > > > > issue > > > >> > > > > > > in > > > >> > > > > > > > > Kafka or OS/JVM? > > > >> > > > > > > > > > > > >> > > > > > > > > Thanks, > > > >> > > > > > > > > > > > >> > > > > > > > > Jun > > > >> > > > > > > > > > > > >> > > > > > > > > On Fri, Jul 27, 2012 at 8:29 PM, jjian fan < > > > >> > > > > xiaofanhad...@gmail.com> > > > >> > > > > > > > > wrote: > > > >> > > > > > > > > > > > >> > > > > > > > > > Jun: > > > >> > > > > > > > > > Yes, if the socket server can't handle the package > > > >> quickly, > > > >> > > tcp > > > >> > > > > > > > protocol > > > >> > > > > > > > > > will drop some network package until the buffer is > > > >> > overflow, > > > >> > > > the > > > >> > > > > > > > > corrupted > > > >> > > > > > > > > > messages is also appear on this situtation! I > run a > > > >> > > systemtap > > > >> > > > > > script > > > >> > > > > > > > to > > > >> > > > > > > > > > find the package droping ,also you can type " cat > > > >> > > > > > /proc/net/sockstat" > > > >> > > > > > > > to > > > >> > > > > > > > > > see the tcp memory increase. I debug the whole > > kafka > > > >> > source > > > >> > > > code > > > >> > > > > > to > > > >> > > > > > > > find > > > >> > > > > > > > > > the bug in file.mkdir() of LogManager.createlog. > > > >> > > > > > > > > > > > > >> > > > > > > > > > JIan Fan > > > >> > > > > > > > > > > > > >> > > > > > > > > > 2012/7/27 Jun Rao <jun...@gmail.com> > > > >> > > > > > > > > > > > > >> > > > > > > > > > > Thanks for the finding. Are you saying that this > > > >> problem > > > >> > is > > > >> > > > > > caused > > > >> > > > > > > by > > > >> > > > > > > > > the > > > >> > > > > > > > > > > buffering in Kafka socket server? How did you > > figure > > > >> that > > > >> > > > out? > > > >> > > > > Is > > > >> > > > > > > > this > > > >> > > > > > > > > > > problem exposed by the same test that caused the > > > >> > corrupted > > > >> > > > > > messages > > > >> > > > > > > > in > > > >> > > > > > > > > > the > > > >> > > > > > > > > > > broker? > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > Thanks, > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > Jun > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > On Fri, Jul 27, 2012 at 2:16 AM, jjian fan < > > > >> > > > > > > xiaofanhad...@gmail.com> > > > >> > > > > > > > > > > wrote: > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > In high cocurrent environment, the tcp > > server > > > >> will > > > >> > > drop > > > >> > > > > > some > > > >> > > > > > > > > > package > > > >> > > > > > > > > > > > when the tcp buffer is over. Then > > > >> LogManager.createlog > > > >> > > will > > > >> > > > > > > create > > > >> > > > > > > > > some > > > >> > > > > > > > > > > > no-exists topic log. But one thing is very > > > strange, > > > >> the > > > >> > > log > > > >> > > > > > > > directory > > > >> > > > > > > > > > > > should be like a-0,a-1, a-2 and so on ,but > > > >> file.mkdir() > > > >> > > > > create > > > >> > > > > > > log > > > >> > > > > > > > > > > > directory like a. Seems some bug in > file.mkdir() > > > of > > > >> > > > > > > > > > LogManager.createlog. > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > the exception message is > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,559] INFO create > directory > > > >> > > > > > /data/kafka/axx-0 > > > >> > > > > > > > > > > > (kafka.log.LogManager) > > > >> > > > > > > > > > > > [2012-07-27 17:08:00,561] ERROR Error > processing > > > >> > > > > > > > MultiProducerRequest > > > >> > > > > > > > > > on > > > >> > > > > > > > > > > > axx:0 (kafka.server.KafkaRequestHandlers) > > > >> > > > > > > > > > > > java.io.FileNotFoundException: > > > >> > > > > > > > > > > /data/kafka/axx-0/00000000000000000000.kafka > > > >> > > > > > > > > > > > (Is a directory) > > > >> > > > > > > > > > > > at java.io.RandomAccessFile.open(Native > Method) > > > >> > > > > > > > > > > > at > > > >> > > > java.io.RandomAccessFile.<init>(RandomAccessFile.java:233) > > > >> > > > > > > > > > > > at > > kafka.utils.Utils$.openChannel(Utils.scala:324) > > > >> > > > > > > > > > > > at > > > >> > > > > kafka.message.FileMessageSet.<init>(FileMessageSet.scala:75) > > > >> > > > > > > > > > > > at kafka.log.Log.loadSegments(Log.scala:144) > > > >> > > > > > > > > > > > at kafka.log.Log.<init>(Log.scala:116) > > > >> > > > > > > > > > > > at > > > >> kafka.log.LogManager.createLog(LogManager.scala:159) > > > >> > > > > > > > > > > > at > > > >> > > > kafka.log.LogManager.getOrCreateLog(LogManager.scala:214) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:74) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34) > > > >> > > > > > > > > > > > at > > > >> > > > > scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > >> > > > > > > > > >> > > > scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > >> > > > > > > > > > > > at > > > >> > > scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > >> > > > > > > > > > > > at > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) > > > >> > > > > > > > > > > > at > > > >> > kafka.network.Processor.handle(SocketServer.scala:296) > > > >> > > > > > > > > > > > at > > > >> kafka.network.Processor.read(SocketServer.scala:319) > > > >> > > > > > > > > > > > at > > > >> kafka.network.Processor.run(SocketServer.scala:214) > > > >> > > > > > > > > > > > at java.lang.Thread.run(Thread.java:679) > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >