Thanks Jay,

Sounds pretty good, though it seems you will likely be trading at least one
iop for the index, unless you can store all indexes in memory, which at a
certain size may not be feasible.  I'm just guessing though....

It seems using this new methodology there is no longer a need for Tagged's
special Kafka branch, as reverse indexing (m messages ago) is now natively
supported...that's nice.

And...in my new job I expect I may want to use Kafka as a commit log in a
very similar way...so the functionality you describe sounds useful.

On Mon, Oct 8, 2012 at 1:59 PM, Jay Kreps <jay.kr...@gmail.com> wrote:

> Hi Taylor,
>
> These are good questions.
>
> 1. It is required. It seemed cleaner to have one mechanism for mapping the
> "log change number" to the physical message and make that efficient rather
> than have lots of options.
>
> 2. Zero-copy transfer is still used. The performance impact can be
> understood in two pieces: (1) finding the chunk of log you need, (2)
> transferring that chunk. This only impacts (1). There is some performance
> impact for (1), but I think it can be made fast enough that it will not be
> significant. My next task is to do some performance work on the log system
> and I will quantify the impact more then.
>
> 3. This is a good question. First you asked if this is a special case of
> general indexing. It is. But because indexing monotonically increasing
> fixed width numerical offsets can be done much more efficiently it is an
> important special case. The idea of creating more general indexes seems
> appealing at first, but when you think about it more you realize you need a
> query language, data model, and many other things for this to be useful and
> essentially you are implementing a database which isn't what Kafka is meant
> to be.
>
> Here is what this adds right off the bat with the patch I checked in:
> a. It is esthetically nice. The first message will have offset 0, the
> second message 1, the 100th message offset 100, etc.
> b. You can read the messages in reverse if you like. If the end of the log
> is 9876 then 100 messages before that is 9776.
> c. It is less error prone: There are no invalid offsets and no byte
> arithmetic.
> d. It fixes the commit() functionality with respect to compressed messages.
> Previously there was effectively no offset for messages inside of a
> compressed set, so one could only commit ones position at compressed
> message set boundaries. This made the semantics of compressed messages very
> problematic.
>
> One of the primary motivators is not the above items, but rather the
> ability to allow more sophisticated retention policies. Some systems at
> LinkedIn use Kafka as a kind of commit log. That is they take a stream of
> changes from Kafka, process them, and apply some munged version of this to
> a local search index, key-value store, or other data structure for serving.
> Virtually all of these systems have some notion of a primary key for
> records. The general problem these systems have to solve is what they need
> to do if they need to recreate their local state (say if a node fails, or
> they need to reprocess data in a different way). Since Kafka only will
> contain a fixed range of data, they can't really re-process from Kafka
> unless the data they serve is also time-based as we will have cleaned out
> old messages. But you could imagine a slightly different retention strategy
> in Kafka that allowed you to retain messages by some primary key. So rather
> than throwing away old segments you would have the option to "clean" old
> segments and just retain the latest record for each primary key. That would
> allow using the kafka log for all restore functionality and still guarantee
> that you restored the latest value for each key. This retention strategy
> would only make sense to use for topics that contain data with a primary
> key, so it would be optional. I think this is actually very powerful when
> combined with replication because it is a way to get a highly available
> "commit" or "restore" log.
>
> -Jay
>
>
> On Mon, Oct 8, 2012 at 12:40 PM, Taylor Gautier <tgaut...@gmail.com>
> wrote:
>
> > I seem to be late to the party on this one - can you summarize what
> > happened to the log files - and wire protocol - as a result of this
> change?
> >
> > I have some questions regarding what implications this change has:
> >
> > 1) is it required or optional, iotw can the physical offset mechanism
> still
> > be used?
> > 2) if required, is it going to change the ability of kafka for
> > ultra-efficient i/o (currently, near zero memory is used, sendfile keeps
> > things fast and tidy)
> > 3) What additional capabilities does this give - for example can one do
> > negative indexing and positive indexing from a given offset?
> >
> > I am wondering, is this a specific instance of a more general class of
> > indexing?
> >
> > By negative indexing, I mean for example at Tagged we wanted the
> capability
> > to retrieve the message at m messages ago from the top of the message
> > queue.
> >
> > To implement this, we patched kafka itself to create a shadow kafka topic
> > that for every message received in topic A, would write the offset of
> said
> > message from topic A into the shadow topic A'.  Since offsets are fixed
> > width, the topic A' is effectively an index and to find m messages ago is
> > simple math -->  read from offset n - m*(fixed message size) where n is
> the
> > current offset of A' and m is num messages ago which will result in an
> > offset in topic A..
> >
> > Can this new ability you have implemented provide that kind of
> > functionality?
> >
> > On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <j...@apache.org>
> wrote:
> >
> > >
> > >      [
> > >
> >
> https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> > ]
> > >
> > > Jay Kreps resolved KAFKA-506.
> > > -----------------------------
> > >
> > >     Resolution: Fixed
> > >
> > > Committed.
> > >
> > > > Store logical offset in log
> > > > ---------------------------
> > > >
> > > >                 Key: KAFKA-506
> > > >                 URL: https://issues.apache.org/jira/browse/KAFKA-506
> > > >             Project: Kafka
> > > >          Issue Type: Bug
> > > >    Affects Versions: 0.8
> > > >            Reporter: Jay Kreps
> > > >            Assignee: Jay Kreps
> > > >             Fix For: 0.8
> > > >
> > > >         Attachments: KAFKA-506-phase-2.patch,
> > > KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch,
> > > KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch,
> > > KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch,
> > > KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch,
> > > KAFKA-506-v4-changes-since-v3.patch
> > > >
> > > >
> > > > Currently we only support retention by dropping entire segment
> files. A
> > > more nuanced retention policy would allow dropping individual messages
> > from
> > > a segment file by recopying it. This is not currently possible because
> > the
> > > lookup structure we use to locate messages is based on the file offset
> > > directly.
> > > > To fix this we should move to a sequential, logical offset
> > (0,1,2,3,...)
> > > which would allow deleting individual messages (e.g. 2) without
> deleting
> > > the entire segment.
> > > > It is desirable to make this change in the 0.8 timeframe since we are
> > > already doing data format changes.
> > > > As part of this we would explicitly store the key field given by the
> > > producer for partitioning (right now there is no way for the consumer
> to
> > > find the value used for partitioning).
> > > > This combination of features would allow a key-based retention policy
> > > that would clean obsolete values either by a user defined key.
> > > > The specific use case I am targeting is a commit log for local state
> > > maintained by a process doing some kind of near-real-time processing.
> The
> > > process could log out its local state changes and be able to restore
> from
> > > this log in the event of a failure. However I think this is a broadly
> > > useful feature.
> > > > The following changes would be part of this:
> > > > 1. The log format would now be
> > > >       8 byte offset
> > > >       4 byte message_size
> > > >       N byte message
> > > > 2. The offsets would be changed to a sequential, logical number
> rather
> > > than the byte offset (e.g. 0,1,2,3,...)
> > > > 3. A local memory-mapped lookup structure will be kept for each log
> > > segment that contains the mapping from logical to physical offset.
> > > > I propose to break this into two patches. The first makes the log
> > format
> > > changes, but retains the physical offset. The second adds the lookup
> > > structure and moves to logical offset.
> > > > Here are a few issues to be considered for the first patch:
> > > > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One
> > > surprising thing is that the offset is actually the offset of the next
> > > message. I think there are actually several uses for the current
> offset.
> > I
> > > would propose making this hold the current message offset since with
> > > logical offsets the next offset is always just current_offset+1. Note
> > that
> > > since we no longer require messages to be dense, it is not true that if
> > the
> > > next offset is N the current offset is N-1 (because N-1 may have been
> > > deleted). Thoughts or objections?
> > > > 2. Currently during iteration over a ByteBufferMessageSet we throw an
> > > exception if there are zero messages in the set. This is used to detect
> > > fetches that are smaller than a single message size. I think this
> > behavior
> > > is misplaced and should be moved up into the consumer.
> > > > 3. In addition to adding a key in Message, I made two other changes:
> > (1)
> > > I moved the CRC to the first field and made it cover the entire message
> > > contents (previously it only covered the payload), (2) I dropped
> support
> > > for Magic=0, effectively making the attributes field required, which
> > > simplifies the code (since we are breaking compatibility anyway).
> > >
> > > --
> > > This message is automatically generated by JIRA.
> > > If you think it was sent incorrectly, please contact your JIRA
> > > administrators
> > > For more information on JIRA, see:
> > http://www.atlassian.com/software/jira
> > >
> >
>

Reply via email to