Hi Taylor,

These are good questions.

1. It is required. It seemed cleaner to have one mechanism for mapping the
"log change number" to the physical message and make that efficient rather
than have lots of options.

2. Zero-copy transfer is still used. The performance impact can be
understood in two pieces: (1) finding the chunk of log you need, (2)
transferring that chunk. This only impacts (1). There is some performance
impact for (1), but I think it can be made fast enough that it will not be
significant. My next task is to do some performance work on the log system
and I will quantify the impact more then.

3. This is a good question. First you asked if this is a special case of
general indexing. It is. But because indexing monotonically increasing
fixed width numerical offsets can be done much more efficiently it is an
important special case. The idea of creating more general indexes seems
appealing at first, but when you think about it more you realize you need a
query language, data model, and many other things for this to be useful and
essentially you are implementing a database which isn't what Kafka is meant
to be.

Here is what this adds right off the bat with the patch I checked in:
a. It is esthetically nice. The first message will have offset 0, the
second message 1, the 100th message offset 100, etc.
b. You can read the messages in reverse if you like. If the end of the log
is 9876 then 100 messages before that is 9776.
c. It is less error prone: There are no invalid offsets and no byte
arithmetic.
d. It fixes the commit() functionality with respect to compressed messages.
Previously there was effectively no offset for messages inside of a
compressed set, so one could only commit ones position at compressed
message set boundaries. This made the semantics of compressed messages very
problematic.

One of the primary motivators is not the above items, but rather the
ability to allow more sophisticated retention policies. Some systems at
LinkedIn use Kafka as a kind of commit log. That is they take a stream of
changes from Kafka, process them, and apply some munged version of this to
a local search index, key-value store, or other data structure for serving.
Virtually all of these systems have some notion of a primary key for
records. The general problem these systems have to solve is what they need
to do if they need to recreate their local state (say if a node fails, or
they need to reprocess data in a different way). Since Kafka only will
contain a fixed range of data, they can't really re-process from Kafka
unless the data they serve is also time-based as we will have cleaned out
old messages. But you could imagine a slightly different retention strategy
in Kafka that allowed you to retain messages by some primary key. So rather
than throwing away old segments you would have the option to "clean" old
segments and just retain the latest record for each primary key. That would
allow using the kafka log for all restore functionality and still guarantee
that you restored the latest value for each key. This retention strategy
would only make sense to use for topics that contain data with a primary
key, so it would be optional. I think this is actually very powerful when
combined with replication because it is a way to get a highly available
"commit" or "restore" log.

-Jay


On Mon, Oct 8, 2012 at 12:40 PM, Taylor Gautier <tgaut...@gmail.com> wrote:

> I seem to be late to the party on this one - can you summarize what
> happened to the log files - and wire protocol - as a result of this change?
>
> I have some questions regarding what implications this change has:
>
> 1) is it required or optional, iotw can the physical offset mechanism still
> be used?
> 2) if required, is it going to change the ability of kafka for
> ultra-efficient i/o (currently, near zero memory is used, sendfile keeps
> things fast and tidy)
> 3) What additional capabilities does this give - for example can one do
> negative indexing and positive indexing from a given offset?
>
> I am wondering, is this a specific instance of a more general class of
> indexing?
>
> By negative indexing, I mean for example at Tagged we wanted the capability
> to retrieve the message at m messages ago from the top of the message
> queue.
>
> To implement this, we patched kafka itself to create a shadow kafka topic
> that for every message received in topic A, would write the offset of said
> message from topic A into the shadow topic A'.  Since offsets are fixed
> width, the topic A' is effectively an index and to find m messages ago is
> simple math -->  read from offset n - m*(fixed message size) where n is the
> current offset of A' and m is num messages ago which will result in an
> offset in topic A..
>
> Can this new ability you have implemented provide that kind of
> functionality?
>
> On Mon, Oct 8, 2012 at 12:16 PM, Jay Kreps (JIRA) <j...@apache.org> wrote:
>
> >
> >      [
> >
> https://issues.apache.org/jira/browse/KAFKA-506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
> ]
> >
> > Jay Kreps resolved KAFKA-506.
> > -----------------------------
> >
> >     Resolution: Fixed
> >
> > Committed.
> >
> > > Store logical offset in log
> > > ---------------------------
> > >
> > >                 Key: KAFKA-506
> > >                 URL: https://issues.apache.org/jira/browse/KAFKA-506
> > >             Project: Kafka
> > >          Issue Type: Bug
> > >    Affects Versions: 0.8
> > >            Reporter: Jay Kreps
> > >            Assignee: Jay Kreps
> > >             Fix For: 0.8
> > >
> > >         Attachments: KAFKA-506-phase-2.patch,
> > KAFKA-506-phase-2-v1.patch, KAFKA-506-phase-2-v2.patch,
> > KAFKA-506-phase-2-v3.patch, KAFKA-506-phase-2-v4.patch,
> > KAFKA-506-phase-2-v5.patch, KAFKA-506-phase-2-v5.patch,
> > KAFKA-506-v1-draft.patch, KAFKA-506-v1.patch,
> > KAFKA-506-v4-changes-since-v3.patch
> > >
> > >
> > > Currently we only support retention by dropping entire segment files. A
> > more nuanced retention policy would allow dropping individual messages
> from
> > a segment file by recopying it. This is not currently possible because
> the
> > lookup structure we use to locate messages is based on the file offset
> > directly.
> > > To fix this we should move to a sequential, logical offset
> (0,1,2,3,...)
> > which would allow deleting individual messages (e.g. 2) without deleting
> > the entire segment.
> > > It is desirable to make this change in the 0.8 timeframe since we are
> > already doing data format changes.
> > > As part of this we would explicitly store the key field given by the
> > producer for partitioning (right now there is no way for the consumer to
> > find the value used for partitioning).
> > > This combination of features would allow a key-based retention policy
> > that would clean obsolete values either by a user defined key.
> > > The specific use case I am targeting is a commit log for local state
> > maintained by a process doing some kind of near-real-time processing. The
> > process could log out its local state changes and be able to restore from
> > this log in the event of a failure. However I think this is a broadly
> > useful feature.
> > > The following changes would be part of this:
> > > 1. The log format would now be
> > >       8 byte offset
> > >       4 byte message_size
> > >       N byte message
> > > 2. The offsets would be changed to a sequential, logical number rather
> > than the byte offset (e.g. 0,1,2,3,...)
> > > 3. A local memory-mapped lookup structure will be kept for each log
> > segment that contains the mapping from logical to physical offset.
> > > I propose to break this into two patches. The first makes the log
> format
> > changes, but retains the physical offset. The second adds the lookup
> > structure and moves to logical offset.
> > > Here are a few issues to be considered for the first patch:
> > > 1. Currently a MessageSet implements Iterable[MessageAndOffset]. One
> > surprising thing is that the offset is actually the offset of the next
> > message. I think there are actually several uses for the current offset.
> I
> > would propose making this hold the current message offset since with
> > logical offsets the next offset is always just current_offset+1. Note
> that
> > since we no longer require messages to be dense, it is not true that if
> the
> > next offset is N the current offset is N-1 (because N-1 may have been
> > deleted). Thoughts or objections?
> > > 2. Currently during iteration over a ByteBufferMessageSet we throw an
> > exception if there are zero messages in the set. This is used to detect
> > fetches that are smaller than a single message size. I think this
> behavior
> > is misplaced and should be moved up into the consumer.
> > > 3. In addition to adding a key in Message, I made two other changes:
> (1)
> > I moved the CRC to the first field and made it cover the entire message
> > contents (previously it only covered the payload), (2) I dropped support
> > for Magic=0, effectively making the attributes field required, which
> > simplifies the code (since we are breaking compatibility anyway).
> >
> > --
> > This message is automatically generated by JIRA.
> > If you think it was sent incorrectly, please contact your JIRA
> > administrators
> > For more information on JIRA, see:
> http://www.atlassian.com/software/jira
> >
>

Reply via email to