Thanks Jay, Sounds pretty good, though it seems you will likely be trading at least one iop for the index, unless you can store all indexes in memory, which at a certain size may not be feasible. I'm just guessing though....
It seems using this new methodology there is no longer a need for Tagged's special Kafka branch, as reverse indexing (m messages ago) is now natively supported...that's nice. And...in my new job I expect I may want to use Kafka as a commit log in a very similar way...so the functionality you describe sounds useful. On Mon, Oct 8, 2012 at 1:59 PM, Jay Kreps <jay.kr...@gmail.com> wrote: > Hi Taylor, > > These are good questions. > > 1. It is required. It seemed cleaner to have one mechanism for mapping the > "log change number" to the physical message and make that efficient rather > than have lots of options. > > 2. Zero-copy transfer is still used. The performance impact can be > understood in two pieces: (1) finding the chunk of log you need, (2) > transferring that chunk. This only impacts (1). There is some performance > impact for (1), but I think it can be made fast enough that it will not be > significant. My next task is to do some performance work on the log system > and I will quantify the impact more then. > > 3. This is a good question. First you asked if this is a special case of > general indexing. It is. But because indexing monotonically increasing > fixed width numerical offsets can be done much more efficiently it is an > important special case. The idea of creating more general indexes seems > appealing at first, but when you think about it more you realize you need a > query language, data model, and many other things for this to be useful and > essentially you are implementing a database which isn't what Kafka is meant > to be. > > Here is what this adds right off the bat with the patch I checked in: > a. It is esthetically nice. The first message will have offset 0, the > second message 1, the 100th message offset 100, etc. > b. You can read the messages in reverse if you like. If the end of the log > is 9876 then 100 messages before that is 9776. > c. It is less error prone: There are no invalid offsets and no byte > arithmetic. > d. It fixes the commit() functionality with respect to compressed messages. > Previously there was effectively no offset for messages inside of a > compressed set, so one could only commit ones position at compressed > message set boundaries. This made the semantics of compressed messages very > problematic. > > One of the primary motivators is not the above items, but rather the > ability to allow more sophisticated retention policies. Some systems at > LinkedIn use Kafka as a kind of commit log. That is they take a stream of > changes from Kafka, process them, and apply some munged version of this to > a local search index, key-value store, or other data structure for serving. > Virtually all of these systems have some notion of a primary key for > records. The general problem these systems have to solve is what they need > to do if they need to recreate their local state (say if a node fails, or > they need to reprocess data in a different way). Since Kafka only will > contain a fixed range of data, they can't really re-process from Kafka > unless the data they serve is also time-based as we will have cleaned out > old messages. But you could imagine a slightly different retention strategy > in Kafka that allowed you to retain messages by some primary key. So rather > than throwing away old segments you would have the option to "clean" old > segments and just retain the latest record for each primary key. That would > allow using the kafka log for all restore functionality and still guarantee > that you restored the latest value for each key. This retention strategy > would only make sense to use for topics that contain data with a primary > key, so it would be optional. I think this is actually very powerful when > combined with replication because it is a way to get a highly available > "commit" or "restore" log. > > -Jay > > > On Mon, Oct 8, 2012 at 12:40 PM, Taylor Gautier <tgaut...@gmail.com> > wrote: > > > I seem to be late to the party on this one - can you summarize what > > happened to the log files - and wire protocol - as a result of this > change? > > > > I have some questions regarding what implications this change has: > > > > 1) is it required or optional, iotw can the physical offset mechanism > still > > be used? > > 2) if required, is it going to change the ability of kafka for > > ultra-efficient i/o (currently, near zero memory is used, sendfile keeps > > things fast and tidy) > > 3) What additional capabilities does this give - for example can one do > > negative indexing and positive indexing from a given offset? > > > > I am wondering, is this a specific instance of a more general class of > > indexing? > > > > By negative indexing, I mean for example at Tagged we wanted the > capability > > to retrieve the message at m messages ago from the top of the message > > queue. > > > > To implement this, we patched kafka itself to create a shadow kafka topic > > that for every message received in topic A, would write the offset of > said > > message from topic A into the shadow topic A'. Since offsets are fixed > > width, the topic A' is effectively an index and to find m messages ago is > > simple math --> read from offset n - m*(fixed message size) where n is > the > > current offset of A' and m is num messages ago which will result in an > > offset in topic A.. > > > > Can this new ability you have implemented provide that kind of > > functionality? > > > > On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <j...@apache.org> > wrote: > > > > > > > > [ > > > > > > https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel > > ] > > > > > > Jay Kreps resolved KAFKA-506. > > > ----------------------------- > > > > > > Resolution: Fixed > > > > > > Committed. > > > > > > > Store logical offset in log > > > > --------------------------- > > > > > > > > Key: KAFKA-506 > > > > URL: https://issues.apache.org/jira/browse/KAFKA-506 > > > > Project: Kafka > > > > Issue Type: Bug > > > > Affects Versions: 0.8 > > > > Reporter: Jay Kreps > > > > Assignee: Jay Kreps > > > > Fix For: 0.8 > > > > > > > > Attachments: KAFKA-506-phase-2.patch, > > > KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch, > > > KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch, > > > KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch, > > > KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch, > > > KAFKA-506-v4-changes-since-v3.patch > > > > > > > > > > > > Currently we only support retention by dropping entire segment > files. A > > > more nuanced retention policy would allow dropping individual messages > > from > > > a segment file by recopying it. This is not currently possible because > > the > > > lookup structure we use to locate messages is based on the file offset > > > directly. > > > > To fix this we should move to a sequential, logical offset > > (0,1,2,3,...) > > > which would allow deleting individual messages (e.g. 2) without > deleting > > > the entire segment. > > > > It is desirable to make this change in the 0.8 timeframe since we are > > > already doing data format changes. > > > > As part of this we would explicitly store the key field given by the > > > producer for partitioning (right now there is no way for the consumer > to > > > find the value used for partitioning). > > > > This combination of features would allow a key-based retention policy > > > that would clean obsolete values either by a user defined key. > > > > The specific use case I am targeting is a commit log for local state > > > maintained by a process doing some kind of near-real-time processing. > The > > > process could log out its local state changes and be able to restore > from > > > this log in the event of a failure. However I think this is a broadly > > > useful feature. > > > > The following changes would be part of this: > > > > 1. The log format would now be > > > > 8 byte offset > > > > 4 byte message_size > > > > N byte message > > > > 2. The offsets would be changed to a sequential, logical number > rather > > > than the byte offset (e.g. 0,1,2,3,...) > > > > 3. A local memory-mapped lookup structure will be kept for each log > > > segment that contains the mapping from logical to physical offset. > > > > I propose to break this into two patches. The first makes the log > > format > > > changes, but retains the physical offset. The second adds the lookup > > > structure and moves to logical offset. > > > > Here are a few issues to be considered for the first patch: > > > > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One > > > surprising thing is that the offset is actually the offset of the next > > > message. I think there are actually several uses for the current > offset. > > I > > > would propose making this hold the current message offset since with > > > logical offsets the next offset is always just current_offset+1. Note > > that > > > since we no longer require messages to be dense, it is not true that if > > the > > > next offset is N the current offset is N-1 (because N-1 may have been > > > deleted). Thoughts or objections? > > > > 2. Currently during iteration over a ByteBufferMessageSet we throw an > > > exception if there are zero messages in the set. This is used to detect > > > fetches that are smaller than a single message size. I think this > > behavior > > > is misplaced and should be moved up into the consumer. > > > > 3. In addition to adding a key in Message, I made two other changes: > > (1) > > > I moved the CRC to the first field and made it cover the entire message > > > contents (previously it only covered the payload), (2) I dropped > support > > > for Magic=0, effectively making the attributes field required, which > > > simplifies the code (since we are breaking compatibility anyway). > > > > > > -- > > > This message is automatically generated by JIRA. > > > If you think it was sent incorrectly, please contact your JIRA > > > administrators > > > For more information on JIRA, see: > > http://www.atlassian.com/software/jira > > > > > >