Add audit trail to kafka
------------------------

                 Key: KAFKA-260
                 URL: https://issues.apache.org/jira/browse/KAFKA-260
             Project: Kafka
          Issue Type: New Feature
    Affects Versions: 0.8
            Reporter: Jay Kreps
            Assignee: Jay Kreps


LinkedIn has a system that does monitoring on top of our data flow to ensure 
all data is delivered to all consumers of data. This works by having each 
logical "tier" through which data passes produce messages to a central 
"audit-trail" topic; these messages give a time period and the number of 
messages that passed through that tier in that time period. Example of tiers 
for data might be "producer", "broker", "hadoop-etl", etc. This makes it 
possible to compare the total events for a given time period to ensure that all 
events that are produced are consumed by all consumers.

This turns out to be extremely useful. We also have an application that 
"balances the books" and checks that all data is consumed in a timely fashion. 
This gives graphs for each topic and shows any data loss and the lag at which 
the data is consumed (if any).

This would be an optional feature that would allow you to to this kind of 
reconciliation automatically for all the topics kafka hosts against all the 
tiers of applications that interact with the data.

Some details, the proposed format of the data is JSON using the following 
format for messages:

{
  "time":1301727060032,  // the timestamp at which this audit message is sent
  "topic": "my_topic_name", // the topic this audit data is for
  "tier":"producer", // a user-defined "tier" name
  "bucket_start": 1301726400000, // the beginning of the time bucket this data 
applies to
  "bucket_end": 1301727000000, // the end of the time bucket this data applies 
to
  "host":"my_host_name.datacenter.linkedin.com", // the server that this was 
sent from
  "datacenter":"hlx32", // the datacenter this occurred in
  "application":"newsfeed_service", // a user-defined application name
  "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for 
this message
  "count":43634
}

DISCUSSION

Time is complex:
1. The audit data must be based on a timestamp in the events not the time on 
machine processing the event. Using this timestamp means that all downstream 
consumers will report audit data on the right time bucket. This means that 
there must be a timestamp in the event, which we don't currently require. 
Arguably we should just add a timestamp to the events, but I think it is 
sufficient for now just to allow the user to provide a function to extract the 
time from their events.
2. For counts to reconcile exactly we can only do analysis at a granularity 
based on the least common multiple of the bucket size used by all tiers. The 
simplest is just to configure them all to use the same bucket size. We 
currently use a bucket size of 10 mins, but anything from 1-60 mins is probably 
reasonable.

For analysis purposes one tier is designated as the source tier and we do 
reconciliation against this count (e.g. if another tier has less, that is 
treated as lost, if another tier has more that is duplication).

Note that this system makes false positives possible since you can lose an 
audit message. It also makes false negatives possible since if you lose both 
normal messages and the associated audit messages it will appear that 
everything adds up. The later problem is astronomically unlikely to happen 
exactly, though.

This would integrate into the client (producer and consumer both) in the 
following way:
1. The user provides a way to get timestamps from messages (required)
2. The user configures the tier name, host name, datacenter name, and 
application name as part of the consumer and producer config. We can provide 
reasonable defaults if not supplied (e.g. if it is a Producer then set tier to 
"producer" and get the hostname from the OS).

The application that processes this data is currently a Java Jetty app and 
talks to mysql. It feeds off the audit topic in kafka and runs both automatic 
monitoring checks and graphical displays of data against this. The data layer 
is not terribly scalable but because the audit data is sent only periodically 
this is enough to allow us to audit thousands of servers on very modest 
hardware, and having sql access makes diving into the data to trace problems to 
particular hosts easier.

LOGISTICS
I would recommend the following steps:
1. Add the audit application, the proposal would be to add a new top-level 
directory equivalent to core or perf called "audit" to house this application. 
At this point it would just be sitting there, not really being used.
2. Integrate these capabilities into the producer as part of the refactoring we 
are doing now
3. Integrate into consumer when possible



--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to