Ok - here's a thought from my colleague…. the index we need for this can mostly be implemented using kafka itself -
for every write of a message to topic foo, write the new offset into topic foo_i. Now you have a topic that consists of fixed length messages (offsets) that essentially supports random access through the existing kafka api, thus computing the nth message in the index is trivial, and that message gives you the original offset to be used in the original topic. The only trouble to do this outside of kafka (say a special producer) is that as far as I can tell producers don't get told the offsets of the messages they wrote, and even if they did more than one producer would end up with an out of order index. On Wed, Sep 21, 2011 at 7:32 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote: > >> but I also would like to have a way to go back say 1,000 messages. > > Given the implementation we have today, that would be hard to implement in > the absence of an index. Each message in a kafka log segment could be of a > different size. Given that you are at some reference point in the segment > and you want to go back "n" messages, there is no way to know what offset > that might be. Even if you decide to do some kind of binary search on the > offsets, it would be hard to guess how far away you are from the reference > point. > > The binary search approach would work if we kept an index per log segment, > that tracks every nth offset and the number of messages since the previous > offset key. > > I'm not aware of any JIRA tracking this feature request. Could you file one > ? > > Thanks, > Neha > > On Wed, Sep 21, 2011 at 7:06 PM, Taylor Gautier <tgaut...@tagged.com> > wrote: > > > I see (and I think I understand why it works this way). > > > > I see that kafka-87 addresses this with a request for having a time based > > index, this would be relatively useful, but I also would like to have a > way > > to go back say 1,000 messages. Other than walking backwards one segment > at > > a time, can then scanning forward from there, do you have any suggestions > > how this might be done or is it also a feature request? > > > > On Wed, Sep 21, 2011 at 5:14 PM, Joel Koshy <jjko...@gmail.com> wrote: > > > > > Hi Taylor, > > > > > > This is an FAQ, that was asked some time ago as well: > > > > > > > > > http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201108.mbox/%3ccafbh0q3qgsyxn4kjtth0zz9djz7j14tm5g24rfjsihffbqd...@mail.gmail.com%3E > > > > > > GetOffsetShell doesn't return the offset of every message. It returns > the > > > offset of the first message in every segment file. If you provide a > time, > > > the offsets you get back are based on the last modified time of the > > segment > > > files. > > > > > > It may be a good idea to paraphrase the above to be displayed with the > > > tool's help message. > > > > > > Thanks, > > > > > > Joel > > > > > > On Wed, Sep 21, 2011 at 4:18 PM, Taylor Gautier <tgaut...@tagged.com> > > > wrote: > > > > Hi, > > > > > > > > Using Kafka 0.6. > > > > > > > > I'm trying to use get offsets but it doesn't seem to work as I > expect. > > I > > > > have a test topic that has some messages in it. Here's the output > of > > a > > > > test client that starts from offset 0 and prints all messages/offsets > > for > > > > the topic: > > > > > > > > Consumed message:foo offset: 12 > > > > Consumed message:bar offset: 24 > > > > Consumed message:foo offset: 36 > > > > Consumed message:bar offset: 48 > > > > Consumed message:hello offset: 62 > > > > Consumed message:world offset: 76 > > > > > > > > Here's a class to print the last n offsets: > > > > > > > > public class SimpleConsumerDemo > > > > > > > > { > > > > > > > > public static void main(String[] args) > > > > > > > > { > > > > > > > > SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", > > 9092, > > > > 1000, 1024); > > > > > > > > > > > > long[] offsets = simpleConsumer.getOffsetsBefore("test", 0, -1L, > 1); > > > > > > > > for (long l : offsets) { > > > > > > > > System.out.println("offset: " + l); > > > > > > > > } > > > > > > > > } > > > > > > > > } > > > > > > > > > > > > Running this as above, with 1 offset, yields expected results: > > > > > > > > ----output----- > > > > > > > > offset: 76 > > > > > > > > > > > > However, asking for 3 offsets yields unexpected results: > > > > > > > > > > > > change to: long[] offsets = simpleConsumer.getOffsetsBefore("test", > 0, > > > -1L, > > > > 3); > > > > > > > > ----output----- > > > > > > > > offset: 76 > > > > > > > > offset: 0 > > > > > > > > > > > > I expected: > > > > > > > > > > > > ----output----- > > > > > > > > offset: 76 > > > > > > > > offset: 62 > > > > > > > > offset: 48 > > > > > > > > Any idea why I did not get what I was looking for/what I am doing > > wrong? > > > > > > > > > >