Well interestingly enough I just checked the logs, and the problem I was sort of thinking might happen already did. Here it is:
[2011-11-18 09:31:52,255] INFO Deleting log segment 00000000000000016226.kafka from cards_card_1185934476-0 (kafka.log.LogManager) [2011-11-18 09:31:52,255] WARN Delete failed. (kafka.log.LogManager) [2011-11-18 09:31:52,255] INFO Deleting log segment 00000000000000000026.kafka from healthCheck1320643480188-0 (kafka.log.LogManager) [2011-11-18 09:31:52,255] INFO Deleting log segment 00000000000000000028.kafka from healthCheck1319860947508-0 (kafka.log.LogManager) [2011-11-18 09:31:52,255] ERROR error when processing request topic:cards_card_1185934476, part:0 offset:16226 maxSize:1048576 kafka.common.OffsetOutOfRangeException: offset 16226 is out of rangekafka.common.OffsetOutOfRangeException: offset 16226 is out of range at kafka.log.Log$.findRange(Log.scala:47) at kafka.log.Log.read(Log.scala:223) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:125) at kafka.server.KafkaRequestHandlers.handleFetchRequest(KafkaRequestHandlers.scala:107) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$2.apply(KafkaRequestHandlers.scala:42) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$2.apply(KafkaRequestHandlers.scala:42) at kafka.network.Processor.handle(SocketServer.scala:268) at kafka.network.Processor.read(SocketServer.scala:291) at kafka.network.Processor.run(SocketServer.scala:202) at java.lang.Thread.run(Thread.java:619) (kafka.server.KafkaRequestHandlers) you see the issue? The consumer had previously read messages up to offset 16226. The cleaner came and took out the segment in the directory so there are no more segments. The consumer came and asked for the offset 16226 and it's now invalid. I had previously thought this might occur only after a restart but it appears to happen even without a restart. On Fri, Nov 18, 2011 at 8:52 AM, Taylor Gautier <tgaut...@tagged.com> wrote: > Right. I'm talking about the broker. Where does it store what is the > most recent offset if there are no log segments? And no ZK. > > > > On Nov 18, 2011, at 8:50 AM, Jun Rao <jun...@gmail.com> wrote: > > > What I described is what happens in the broker. If you use > SimpleConsumer, > > then it's the consumer's responsibility to remember the last offset. The > > server doesn't store the state for consumers. > > > > Thanks, > > > > Jun > > > > On Fri, Nov 18, 2011 at 8:19 AM, Taylor Gautier <tgaut...@tagged.com> > wrote: > > > >> how? where is the information kept? If ZK is not around, and it's not > on > >> disk, how is this information passed to the next process after the > restart? > >> > >> On Fri, Nov 18, 2011 at 8:04 AM, Jun Rao <jun...@gmail.com> wrote: > >> > >>> 4) is incorrect. "Last offset" remains to be 'a' even after the data is > >>> cleaned. So in 5), the offset will be 2 x 'a'. That is, we never > recycle > >>> offsets. They keep increasing. > >>> > >>> Thanks, > >>> > >>> Jun > >>> > >>> On Fri, Nov 18, 2011 at 7:02 AM, Taylor Gautier <tgaut...@tagged.com> > >>> wrote: > >>> > >>>> I don't use high level consumers - just low level. What I was > thinking > >>> was > >>>> the following. Let's assume I have turned off ZK in my setup. > >>>> > >>>> 1) Send 1 message to topic A. Kafka creates a directory and log > >> segment > >>>> for A. The log segment starts at 0. Now, the "last offset" of the > >>> topic > >>>> is a. > >>>> > >>>> 2) A consumer reads from topic A the message, and records that the > most > >>>> recent offset in topic A is a. > >>>> > >>>> 3) Much time passes, the cleaner runs, and deletes the log segment > >>>> > >>>> 4) More time passes, I restart Kafka. Kafka sees the topic A > >> directory, > >>>> but has no segment file to initialize from. So the "last offset" is > >>>> considered to be 0. > >>>> > >>>> 5) Send 1 message to topic A. Kafka creates a log segment for A > >> starting > >>>> at 0. The new last offset of the topic is a'. > >>>> > >>>> 6) The consumer from step 2 tries to read from Kafka at offset a, but > >>> this > >>>> is now an invalid offset. > >>>> > >>>> Does that sound right? I haven't tried this yet, I'm just doing a > >>> thought > >>>> experiment here to try to figure out what would happen. > >>>> > >>>> > >>>> > >>>> > >>>> On Thu, Nov 17, 2011 at 11:01 PM, Jun Rao <jun...@gmail.com> wrote: > >>>> > >>>>> This is true for the high-level ZK-based consumer. > >>>>> > >>>>> Jun > >>>>> > >>>>> On Thu, Nov 17, 2011 at 10:59 PM, Inder Pall <inder.p...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Jun & Taylor, > >>>>>> would it be right to say that consumers without ZK won't be a > >> viable > >>>>> option > >>>>>> if you can't handle replay of old messages in your application. > >>>>>> > >>>>>> - inder > >>>>>> > >>>>>> On Fri, Nov 18, 2011 at 12:27 PM, Jun Rao <jun...@gmail.com> > >> wrote: > >>>>>> > >>>>>>> Taylor, > >>>>>>> > >>>>>>> When you start a consumer, it always tries to get the last > >>>> checkpointed > >>>>>>> offset from ZK. If no offset can be found in ZK, the consumer > >>> starts > >>>>> from > >>>>>>> either the smallest or the largest available offset in the > >> broker. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Thu, Nov 17, 2011 at 9:20 PM, Taylor Gautier < > >>> tgaut...@tagged.com > >>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> hmmm - and if you turn off zookeeper? > >>>>>>>> > >>>>>>>> On Thu, Nov 17, 2011 at 9:15 PM, Inder Pall < > >>> inder.p...@gmail.com> > >>>>>>> wrote: > >>>>>>>> > >>>>>>>>> The consumer offsets are stored in ZooKeeper by topic and > >>>>> partition. > >>>>>>>>> That's how in a consumer fail over scenario you don't get > >>>> duplicate > >>>>>>>>> messages > >>>>>>>>> > >>>>>>>>> - Inder > >>>>>>>>> > >>>>>>>>> On Fri, Nov 18, 2011 at 10:33 AM, Taylor Gautier < > >>>>>> tgaut...@tagged.com > >>>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Hi, > >>>>>>>>>> > >>>>>>>>>> We've noticed that the cleaner script in Kafka removes > >> empty > >>>> log > >>>>>>>> segments > >>>>>>>>>> but not the directories themselves. I am actually > >> wondering > >>>>>>> something > >>>>>>>> - > >>>>>>>>> I > >>>>>>>>>> always assumed that Kafka could restore the latest offset > >> for > >>>>>>> existing > >>>>>>>>>> topics by scanning the log directory for all directories > >> and > >>>>>> scanning > >>>>>>>> the > >>>>>>>>>> directories for log segment files to restore the latest > >>> offset. > >>>>>>>>>> > >>>>>>>>>> Now this conclusion I have made simply by observation - so > >> it > >>>>> could > >>>>>>> be > >>>>>>>>>> entirely wrong. > >>>>>>>>>> > >>>>>>>>>> My question is however - if I am right, and the cleaner > >>> removes > >>>>> all > >>>>>>> the > >>>>>>>>> log > >>>>>>>>>> segments for a given topic so that a given topic directory > >> is > >>>>>> empty, > >>>>>>>> how > >>>>>>>>>> does Kafka behave when restarted? How does it know what > >> the > >>>> next > >>>>>>>> offset > >>>>>>>>>> should be? > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Inder > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Inder > >>>>>> > >>>>> > >>>> > >>> > >> >