Implements per-cpu-local iterator and channel-wide iterator. Implements a read()
file operation based on these iterators. These iterators or the read() file
operation can be used by ring buffer clients.

The channel-wide iterator implements timestamp-ordered fusion merge of per-cpu
channels using a priority heap.

Signed-off-by: Mathieu Desnoyers <[email protected]>
---
 include/linux/ringbuffer/iterator.h   |   70 ++
 lib/ringbuffer/Makefile               |    1 
 lib/ringbuffer/ring_buffer_iterator.c |  796 ++++++++++++++++++++++++++++++++++
 3 files changed, 867 insertions(+)

Index: linux.trees.git/lib/ringbuffer/ring_buffer_iterator.c
===================================================================
--- /dev/null   1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/lib/ringbuffer/ring_buffer_iterator.c       2010-08-17 
12:53:17.000000000 -0400
@@ -0,0 +1,796 @@
+/*
+ * ring_buffer_iterator.c
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <[email protected]>
+ *
+ * Ring buffer and channel iterators. Get each event of a channel in order. 
Uses
+ * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
+ * complexity for the "get next event" operation.
+ *
+ * Author:
+ *     Mathieu Desnoyers <[email protected]>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/iterator.h>
+#include <linux/jiffies.h>
+#include <linux/module.h>
+
+/*
+ * Safety factor taking into account internal kernel interrupt latency.
+ * Assuming 250ms worse-case latency.
+ */
+#define MAX_SYSTEM_LATENCY     250
+
+/*
+ * Maximum delta expected between trace clocks. At most 1 jiffy delta.
+ */
+#define MAX_CLOCK_DELTA                (jiffies_to_usecs(1) * 1000)
+
+/**
+ * ring_buffer_get_next_record - Get the next record in a buffer.
+ * @chan: channel
+ * @buf: buffer
+ *
+ * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
+ * buffer is empty and finalized. The buffer must already be opened for 
reading.
+ */
+ssize_t ring_buffer_get_next_record(struct channel *chan,
+                                   struct ring_buffer *buf)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer_iter *iter = &buf->iter;
+       int ret;
+
+restart:
+       switch (iter->state) {
+       case ITER_GET_SUBBUF:
+               ret = ring_buffer_get_next_subbuf(buf);
+               if (ret && !ACCESS_ONCE(buf->finalized)
+                   && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+                       /*
+                        * Use "pull" scheme for global buffers. The reader
+                        * itself flushes the buffer to "pull" data not visible
+                        * to readers yet. Flush current subbuffer and re-try.
+                        *
+                        * Per-CPU buffers rather use a "push" scheme because
+                        * the IPI needed to flush all CPU's buffers is too
+                        * costly. In the "push" scheme, the reader waits for
+                        * the writer periodic deferrable timer to flush the
+                        * buffers (keeping track of a quiescent state
+                        * timestamp). Therefore, the writer "pushes" data out
+                        * of the buffers rather than letting the reader "pull"
+                        * data from the buffer.
+                        */
+                       ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+                       ret = ring_buffer_get_next_subbuf(buf);
+               }
+               if (ret)
+                       return ret;
+               iter->consumed = buf->cons_snapshot;
+               iter->data_size = ring_buffer_get_read_data_size(config, buf);
+               iter->read_offset = iter->consumed;
+               /* skip header */
+               iter->read_offset += config->cb.subbuffer_header_size();
+               iter->state = ITER_TEST_RECORD;
+               goto restart;
+       case ITER_TEST_RECORD:
+               if (iter->read_offset - iter->consumed >= iter->data_size) {
+                       iter->state = ITER_PUT_SUBBUF;
+               } else {
+                       CHAN_WARN_ON(chan, !config->cb.record_get);
+                       config->cb.record_get(config, chan, buf,
+                                             iter->read_offset,
+                                             &iter->header_len,
+                                             &iter->payload_len,
+                                             &iter->timestamp);
+                       iter->read_offset += iter->header_len;
+                       subbuffer_consume_record(config, &buf->backend);
+                       iter->state = ITER_NEXT_RECORD;
+                       return iter->payload_len;
+               }
+               goto restart;
+       case ITER_NEXT_RECORD:
+               iter->read_offset += iter->payload_len;
+               iter->state = ITER_TEST_RECORD;
+               goto restart;
+       case ITER_PUT_SUBBUF:
+               ring_buffer_put_next_subbuf(buf);
+               iter->state = ITER_GET_SUBBUF;
+               goto restart;
+       default:
+               CHAN_WARN_ON(chan, 1);  /* Should not happen */
+               return -EPERM;
+       }
+}
+EXPORT_SYMBOL_GPL(ring_buffer_get_next_record);
+
+static int buf_is_higher(void *a, void *b)
+{
+       struct ring_buffer *bufa = a;
+       struct ring_buffer *bufb = b;
+
+       /* Consider lowest timestamps to be at the top of the heap */
+       return (bufa->iter.timestamp < bufb->iter.timestamp);
+}
+
+static
+void ring_buffer_get_empty_buf_records(const struct ring_buffer_config *config,
+                                      struct channel *chan)
+{
+       struct ptr_heap *heap = &chan->iter.heap;
+       struct ring_buffer *buf, *tmp;
+       ssize_t len;
+
+       list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
+                                iter.empty_node) {
+               len = ring_buffer_get_next_record(chan, buf);
+
+               /*
+                * Deal with -EAGAIN and -ENODATA.
+                * len >= 0 means record contains data.
+                * -EBUSY should never happen, because we support only one
+                * reader.
+                */
+               switch (len) {
+               case -EAGAIN:
+                       /* Keep node in empty list */
+                       break;
+               case -ENODATA:
+                       /*
+                        * Buffer is finalized. Don't add to list of empty
+                        * buffer, because it has no more data to provide, ever.
+                        */
+                       list_del(&buf->iter.empty_node);
+                       break;
+               case -EBUSY:
+                       CHAN_WARN_ON(chan, 1);
+                       break;
+               default:
+                       /*
+                        * Insert buffer into the heap, remove from empty buffer
+                        * list. The heap should never overflow.
+                        */
+                       CHAN_WARN_ON(chan, len < 0);
+                       list_del(&buf->iter.empty_node);
+                       CHAN_WARN_ON(chan, heap_insert(heap, buf) != NULL);
+               }
+       }
+}
+
+static
+void ring_buffer_wait_for_qs(const struct ring_buffer_config *config,
+                            struct channel *chan)
+{
+       u64 timestamp_qs;
+       unsigned long wait_msecs;
+
+       /*
+        * No need to wait if no empty buffers are present.
+        */
+       if (list_empty(&chan->iter.empty_head))
+               return;
+
+       timestamp_qs = config->cb.ring_buffer_clock_read(chan);
+       /*
+        * We need to consider previously empty buffers.
+        * Do a get next buf record on each of them. Add them to
+        * the heap if they have data. If at least one of them
+        * don't have data, we need to wait for
+        * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
+        * buffers have been switched either by the timer or idle entry) and
+        * check them again, adding them if they have data.
+        */
+       ring_buffer_get_empty_buf_records(config, chan);
+
+       /*
+        * No need to wait if no empty buffers are present.
+        */
+       if (list_empty(&chan->iter.empty_head))
+               return;
+
+       /*
+        * We need to wait for the buffer switch timer to run. If the
+        * CPU is idle, idle entry performed the switch.
+        * TODO: we could optimize further by skipping the sleep if all
+        * empty buffers belong to idle or offline cpus.
+        */
+       wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
+       wait_msecs += MAX_SYSTEM_LATENCY;
+       msleep(wait_msecs);
+       ring_buffer_get_empty_buf_records(config, chan);
+       /*
+        * Any buffer still in the empty list here cannot possibly
+        * contain an event with a timestamp prior to "timestamp_qs".
+        * The new quiescent state timestamp is the one we grabbed
+        * before waiting for buffer data.  It is therefore safe to
+        * ignore empty buffers up to last_qs timestamp for fusion
+        * merge.
+        */
+       chan->iter.last_qs = timestamp_qs;
+}
+
+/**
+ * channel_get_next_record - Get the next record in a channel.
+ * @chan: channel
+ * @ret_buf: the buffer in which the event is located (output)
+ *
+ * Returns the size of new current event, -EAGAIN if all buffers are empty,
+ * -ENODATA if all buffers are empty and finalized. The channel must already be
+ * opened for reading.
+ */
+
+ssize_t channel_get_next_record(struct channel *chan,
+                               struct ring_buffer **ret_buf)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer *buf;
+       struct ptr_heap *heap;
+       ssize_t len;
+
+       if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
+               *ret_buf = channel_get_ring_buffer(config, chan, 0);
+               return ring_buffer_get_next_record(chan, *ret_buf);
+       }
+
+       heap = &chan->iter.heap;
+
+       /*
+        * get next record for topmost buffer.
+        */
+       buf = heap_maximum(heap);
+       if (buf) {
+               len = ring_buffer_get_next_record(chan, buf);
+               /*
+                * Deal with -EAGAIN and -ENODATA.
+                * len >= 0 means record contains data.
+                */
+               switch (len) {
+               case -EAGAIN:
+                       buf->iter.timestamp = 0;
+                       list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+                       /* Remove topmost buffer from the heap */
+                       CHAN_WARN_ON(chan, heap_remove(heap) != buf);
+                       break;
+               case -ENODATA:
+                       /*
+                        * Buffer is finalized. Remove buffer from heap and
+                        * don't add to list of empty buffer, because it has no
+                        * more data to provide, ever.
+                        */
+                       CHAN_WARN_ON(chan, heap_remove(heap) != buf);
+                       break;
+               case -EBUSY:
+                       CHAN_WARN_ON(chan, 1);
+                       break;
+               default:
+                       /*
+                        * Reinsert buffer into the heap. Note that heap can be
+                        * partially empty, so we need to use
+                        * heap_replace_max().
+                        */
+                       CHAN_WARN_ON(chan, len < 0);
+                       CHAN_WARN_ON(chan, heap_replace_max(heap, buf) != buf);
+                       break;
+               }
+       }
+
+       buf = heap_maximum(heap);
+       if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
+               /*
+                * Deal with buffers previously showing no data.
+                * Add buffers containing data to the heap, update
+                * last_qs.
+                */
+               ring_buffer_wait_for_qs(config, chan);
+       }
+
+       *ret_buf = buf = heap_maximum(heap);
+       if (buf) {
+               /*
+                * If this warning triggers, you probably need to check your
+                * system interrupt latency. Typical causes: too many printk()
+                * output going to a serial console with interrupts off.
+                * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
+                * Observed on SMP KVM setups with trace_clock().
+                */
+               if (chan->iter.last_timestamp
+                   > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
+                       printk(KERN_WARNING "ring_buffer: timestamps going "
+                              "backward. Last time %llu ns, cpu %d, "
+                              "current time %llu ns, cpu %d, "
+                              "delta %llu ns.\n",
+                              chan->iter.last_timestamp, chan->iter.last_cpu,
+                              buf->iter.timestamp, buf->backend.cpu,
+                              chan->iter.last_timestamp - buf->iter.timestamp);
+                       CHAN_WARN_ON(chan, 1);
+               }
+               chan->iter.last_timestamp = buf->iter.timestamp;
+               chan->iter.last_cpu = buf->backend.cpu;
+               return buf->iter.payload_len;
+       } else {
+               /* Heap is empty */
+               if (list_empty(&chan->iter.empty_head))
+                       return -ENODATA;        /* All buffers finalized */
+               else
+                       return -EAGAIN;         /* Temporarily empty */
+       }
+}
+EXPORT_SYMBOL_GPL(channel_get_next_record);
+
+static
+void ring_buffer_iterator_init(struct channel *chan, struct ring_buffer *buf)
+{
+       if (buf->iter.allocated)
+               return;
+
+       buf->iter.allocated = 1;
+       if (chan->iter.read_open && !buf->iter.read_open) {
+               CHAN_WARN_ON(chan, ring_buffer_open_read(buf) != 0);
+               buf->iter.read_open = 1;
+       }
+
+       /* Add to list of buffers without any current record */
+       if (chan->backend.config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+}
+
+#ifdef CONFIG_HOTPLUG_CPU
+static
+int __cpuinit channel_iterator_cpu_hotplug(struct notifier_block *nb,
+                                          unsigned long action,
+                                          void *hcpu)
+{
+       unsigned int cpu = (unsigned long)hcpu;
+       struct channel *chan = container_of(nb, struct channel,
+                                           hp_iter_notifier);
+       struct ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+       const struct ring_buffer_config *config = chan->backend.config;
+
+       if (!chan->hp_iter_enable)
+               return NOTIFY_DONE;
+
+       CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+       switch (action) {
+       case CPU_DOWN_FAILED:
+       case CPU_DOWN_FAILED_FROZEN:
+       case CPU_ONLINE:
+       case CPU_ONLINE_FROZEN:
+               ring_buffer_iterator_init(chan, buf);
+               return NOTIFY_OK;
+       default:
+               return NOTIFY_DONE;
+       }
+}
+#endif
+
+int channel_iterator_init(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer *buf;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               int cpu, ret;
+
+               INIT_LIST_HEAD(&chan->iter.empty_head);
+               ret = heap_init(&chan->iter.heap,
+                               num_possible_cpus()
+                               * sizeof(struct ring_buffer *),
+                               GFP_KERNEL, buf_is_higher);
+               if (ret)
+                       return ret;
+               /*
+                * In case of non-hotplug cpu, if the ring-buffer is allocated
+                * in early initcall, it will not be notified of secondary cpus.
+                * In that off case, we need to allocate for all possible cpus.
+                */
+#ifdef CONFIG_HOTPLUG_CPU
+               chan->hp_iter_notifier.notifier_call =
+                       channel_iterator_cpu_hotplug;
+               chan->hp_iter_notifier.priority = 10;
+               register_cpu_notifier(&chan->hp_iter_notifier);
+               get_online_cpus();
+               for_each_online_cpu(cpu) {
+                       buf = per_cpu_ptr(chan->backend.buf, cpu);
+                       ring_buffer_iterator_init(chan, buf);
+               }
+               chan->hp_iter_enable = 1;
+               put_online_cpus();
+#else
+               for_each_possible_cpu(cpu) {
+                       buf = per_cpu_ptr(chan->backend.buf, cpu);
+                       ring_buffer_iterator_init(chan, buf);
+               }
+#endif
+       } else {
+               buf = channel_get_ring_buffer(config, chan, 0);
+               ring_buffer_iterator_init(chan, buf);
+       }
+       return 0;
+}
+
+void channel_iterator_unregister_notifiers(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               chan->hp_iter_enable = 0;
+               unregister_cpu_notifier(&chan->hp_iter_notifier);
+       }
+}
+
+void channel_iterator_free(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+               heap_free(&chan->iter.heap);
+}
+
+int ring_buffer_iterator_open(struct ring_buffer *buf)
+{
+       struct channel *chan = buf->backend.chan;
+       const struct ring_buffer_config *config = chan->backend.config;
+       CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+       return ring_buffer_open_read(buf);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_iterator_open);
+
+/*
+ * Note: Iterators must not be mixed with other types of outputs, because an
+ * iterator can leave the buffer in "GET" state, which is not consistent with
+ * other types of output (mmap, splice, raw data read).
+ */
+void ring_buffer_iterator_release(struct ring_buffer *buf)
+{
+       ring_buffer_release_read(buf);
+}
+EXPORT_SYMBOL_GPL(ring_buffer_iterator_release);
+
+int channel_iterator_open(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer *buf;
+       int ret = 0, cpu;
+
+       CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               /* Allow CPU hotplug to keep track of opened reader */
+               chan->iter.read_open = 1;
+               for_each_channel_cpu(cpu, chan) {
+                       buf = channel_get_ring_buffer(config, chan, cpu);
+                       ret = ring_buffer_iterator_open(buf);
+                       if (ret)
+                               goto error;
+                       buf->iter.read_open = 1;
+               }
+               put_online_cpus();
+       } else {
+               buf = channel_get_ring_buffer(config, chan, 0);
+               ret = ring_buffer_iterator_open(buf);
+       }
+       return ret;
+error:
+       /* Error should always happen on CPU 0, hence no close is required. */
+       CHAN_WARN_ON(chan, cpu != 0);
+       put_online_cpus();
+       return ret;
+}
+EXPORT_SYMBOL_GPL(channel_iterator_open);
+
+void channel_iterator_release(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer *buf;
+       int cpu;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               get_online_cpus();
+               for_each_channel_cpu(cpu, chan) {
+                       buf = channel_get_ring_buffer(config, chan, cpu);
+                       if (buf->iter.read_open) {
+                               ring_buffer_iterator_release(buf);
+                               buf->iter.read_open = 0;
+                       }
+               }
+               chan->iter.read_open = 0;
+               put_online_cpus();
+       } else {
+               buf = channel_get_ring_buffer(config, chan, 0);
+               ring_buffer_iterator_release(buf);
+       }
+}
+EXPORT_SYMBOL_GPL(channel_iterator_release);
+
+void ring_buffer_iterator_reset(struct ring_buffer *buf)
+{
+       struct channel *chan = buf->backend.chan;
+
+       if (buf->iter.state != ITER_GET_SUBBUF)
+               ring_buffer_put_next_subbuf(buf);
+       buf->iter.state = ITER_GET_SUBBUF;
+       /* Remove from heap (if present). */
+       if (heap_cherrypick(&chan->iter.heap, buf))
+               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+       buf->iter.timestamp = 0;
+       buf->iter.header_len = 0;
+       buf->iter.payload_len = 0;
+       buf->iter.consumed = 0;
+       buf->iter.read_offset = 0;
+       buf->iter.data_size = 0;
+       /* Don't reset allocated and read_open */
+}
+
+void channel_iterator_reset(struct channel *chan)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       struct ring_buffer *buf;
+       int cpu;
+
+       /* Empty heap, put into empty_head */
+       while ((buf = heap_remove(&chan->iter.heap)) != NULL)
+               list_add(&buf->iter.empty_node, &chan->iter.empty_head);
+
+       for_each_channel_cpu(cpu, chan) {
+               buf = channel_get_ring_buffer(config, chan, cpu);
+               ring_buffer_iterator_reset(buf);
+       }
+       /* Don't reset read_open */
+       chan->iter.last_qs = 0;
+       chan->iter.last_timestamp = 0;
+       chan->iter.last_cpu = 0;
+       chan->iter.len_left = 0;
+}
+
+/*
+ * Ring buffer payload extraction read() implementation.
+ */
+static
+ssize_t channel_ring_buffer_file_read(struct file *filp,
+                                     char __user *user_buf,
+                                     size_t count,
+                                     loff_t *ppos,
+                                     struct channel *chan,
+                                     struct ring_buffer *buf,
+                                     int fusionmerge)
+{
+       const struct ring_buffer_config *config = chan->backend.config;
+       size_t read_count = 0, read_offset;
+       ssize_t len;
+
+       might_sleep();
+       if (!access_ok(VERIFY_WRITE, user_buf, count))
+               return -EFAULT;
+
+       /* Finish copy of previous record */
+       if (*ppos != 0) {
+               if (read_count < count) {
+                       len = chan->iter.len_left;
+                       read_offset = *ppos;
+                       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
+                           && fusionmerge)
+                               buf = heap_maximum(&chan->iter.heap);
+                       CHAN_WARN_ON(chan, !buf);
+                       goto skip_get_next;
+               }
+       }
+
+       while (read_count < count) {
+               size_t copy_len, space_left;
+
+               if (fusionmerge)
+                       len = channel_get_next_record(chan, &buf);
+               else
+                       len = ring_buffer_get_next_record(chan, buf);
+len_test:
+               if (len < 0) {
+                       /*
+                        * Check if buffer is finalized (end of file).
+                        */
+                       if (len == -ENODATA) {
+                               /* A 0 read_count will tell about end of file */
+                               goto nodata;
+                       }
+                       if (filp->f_flags & O_NONBLOCK) {
+                               if (!read_count)
+                                       read_count = -EAGAIN;
+                               goto nodata;
+                       } else {
+                               int error;
+
+                               /*
+                                * No data available at the moment, return what
+                                * we got.
+                                */
+                               if (read_count)
+                                       goto nodata;
+
+                               /*
+                                * Wait for returned len to be >= 0 or -ENODATA.
+                                */
+                               if (fusionmerge)
+                                       error = wait_event_interruptible(
+                                         chan->read_wait,
+                                         ((len = channel_get_next_record(chan,
+                                               &buf)), len != -EAGAIN));
+                               else
+                                       error = wait_event_interruptible(
+                                         buf->read_wait,
+                                         ((len = ring_buffer_get_next_record(
+                                                 chan, buf)), len != -EAGAIN));
+                               CHAN_WARN_ON(chan, len == -EBUSY);
+                               if (error) {
+                                       read_count = error;
+                                       goto nodata;
+                               }
+                               CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
+                               goto len_test;
+                       }
+               }
+               read_offset = buf->iter.read_offset;
+skip_get_next:
+               space_left = count - read_count;
+               if (len <= space_left) {
+                       copy_len = len;
+                       chan->iter.len_left = 0;
+                       *ppos = 0;
+               } else {
+                       copy_len = space_left;
+                       chan->iter.len_left = len - copy_len;
+                       *ppos = read_offset + copy_len;
+               }
+               if (__ring_buffer_copy_to_user(&buf->backend, read_offset,
+                                              &user_buf[read_count],
+                                              copy_len)) {
+                       /*
+                        * Leave the len_left and ppos values at their current
+                        * state, as we currently have a valid event to read.
+                        */
+                       return -EFAULT;
+               }
+               read_count += copy_len;
+       };
+       return read_count;
+
+nodata:
+       *ppos = 0;
+       chan->iter.len_left = 0;
+       return read_count;
+}
+
+/**
+ * ring_buffer_sp_file_read - Read buffer record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t ring_buffer_file_read(struct file *filp,
+                             char __user *user_buf,
+                             size_t count,
+                             loff_t *ppos)
+{
+       struct inode *inode = filp->f_dentry->d_inode;
+       struct ring_buffer *buf = inode->i_private;
+       struct channel *chan = buf->backend.chan;
+
+       return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
+                                            chan, buf, 0);
+}
+
+/**
+ * channel_file_read - Read channel record payload.
+ * @filp: file structure pointer.
+ * @buffer: user buffer to read data into.
+ * @count: number of bytes to read.
+ * @ppos: file read position.
+ *
+ * Returns a negative value on error, or the number of bytes read on success.
+ * ppos is used to save the position _within the current record_ between calls
+ * to read().
+ */
+static
+ssize_t channel_file_read(struct file *filp,
+                         char __user *user_buf,
+                         size_t count,
+                         loff_t *ppos)
+{
+       struct inode *inode = filp->f_dentry->d_inode;
+       struct channel *chan = inode->i_private;
+       const struct ring_buffer_config *config = chan->backend.config;
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
+               return channel_ring_buffer_file_read(filp, user_buf, count,
+                                                    ppos, chan, NULL, 1);
+       else {
+               struct ring_buffer *buf =
+                       channel_get_ring_buffer(config, chan, 0);
+               return channel_ring_buffer_file_read(filp, user_buf, count,
+                                                    ppos, chan, buf, 0);
+       }
+}
+
+static
+int ring_buffer_file_open(struct inode *inode, struct file *file)
+{
+       struct ring_buffer *buf = inode->i_private;
+       int ret;
+
+       ret = ring_buffer_iterator_open(buf);
+       if (ret)
+               return ret;
+
+       file->private_data = buf;
+       ret = nonseekable_open(inode, file);
+       if (ret)
+               goto release_iter;
+       return 0;
+
+release_iter:
+       ring_buffer_iterator_release(buf);
+       return ret;
+}
+
+static
+int ring_buffer_file_release(struct inode *inode, struct file *file)
+{
+       struct ring_buffer *buf = inode->i_private;
+
+       ring_buffer_iterator_release(buf);
+       return 0;
+}
+
+static
+int channel_file_open(struct inode *inode, struct file *file)
+{
+       struct channel *chan = inode->i_private;
+       int ret;
+
+       ret = channel_iterator_open(chan);
+       if (ret)
+               return ret;
+
+       file->private_data = chan;
+       ret = nonseekable_open(inode, file);
+       if (ret)
+               goto release_iter;
+       return 0;
+
+release_iter:
+       channel_iterator_release(chan);
+       return ret;
+}
+
+static
+int channel_file_release(struct inode *inode, struct file *file)
+{
+       struct channel *chan = inode->i_private;
+
+       channel_iterator_release(chan);
+       return 0;
+}
+
+const struct file_operations channel_payload_file_operations = {
+       .open = channel_file_open,
+       .release = channel_file_release,
+       .read = channel_file_read,
+       .llseek = ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(channel_payload_file_operations);
+
+const struct file_operations ring_buffer_payload_file_operations = {
+       .open = ring_buffer_file_open,
+       .release = ring_buffer_file_release,
+       .read = ring_buffer_file_read,
+       .llseek = ring_buffer_no_llseek,
+};
+EXPORT_SYMBOL_GPL(ring_buffer_payload_file_operations);
Index: linux.trees.git/include/linux/ringbuffer/iterator.h
===================================================================
--- /dev/null   1970-01-01 00:00:00.000000000 +0000
+++ linux.trees.git/include/linux/ringbuffer/iterator.h 2010-08-16 
16:09:23.000000000 -0400
@@ -0,0 +1,70 @@
+#ifndef _LINUX_RING_BUFFER_ITERATOR_H
+#define _LINUX_RING_BUFFER_ITERATOR_H
+
+/*
+ * linux/ringbuffer/iterator.h
+ *
+ * (C) Copyright 2010 - Mathieu Desnoyers <[email protected]>
+ *
+ * Ring buffer and channel iterators.
+ *
+ * Author:
+ *     Mathieu Desnoyers <[email protected]>
+ *
+ * Dual LGPL v2.1/GPL v2 license.
+ */
+
+#include <linux/ringbuffer/backend.h>
+#include <linux/ringbuffer/frontend.h>
+
+/*
+ * ring_buffer_get_next_record advances the buffer read position to the next
+ * record. It returns either the size of the next record, -EAGAIN if there is
+ * currently no data available, or -ENODATA if no data is available and buffer
+ * is finalized.
+ */
+extern ssize_t ring_buffer_get_next_record(struct channel *chan,
+                                          struct ring_buffer *buf);
+
+/*
+ * channel_get_next_record advances the buffer read position to the next 
record.
+ * It returns either the size of the next record, -EAGAIN if there is currently
+ * no data available, or -ENODATA if no data is available and buffer is
+ * finalized.
+ * Returns the current buffer in ret_buf.
+ */
+extern ssize_t channel_get_next_record(struct channel *chan,
+                                      struct ring_buffer **ret_buf);
+
+/**
+ * read_current_record - copy the buffer current record into dest.
+ * @buf: ring buffer
+ * @dest: destination where the record should be copied
+ *
+ * dest should be large enough to contain the record. Returns the number of
+ * bytes copied.
+ */
+static inline size_t read_current_record(struct ring_buffer *buf, void *dest)
+{
+       return ring_buffer_read(&buf->backend, buf->iter.read_offset,
+                               dest, buf->iter.payload_len);
+}
+
+extern int ring_buffer_iterator_open(struct ring_buffer *buf);
+extern void ring_buffer_iterator_release(struct ring_buffer *buf);
+extern int channel_iterator_open(struct channel *chan);
+extern void channel_iterator_release(struct channel *chan);
+
+extern const struct file_operations channel_payload_file_operations;
+extern const struct file_operations ring_buffer_payload_file_operations;
+
+/*
+ * Used internally.
+ */
+int channel_iterator_init(struct channel *chan);
+void channel_iterator_unregister_notifiers(struct channel *chan);
+void channel_iterator_free(struct channel *chan);
+void channel_iterator_reset(struct channel *chan);
+void ring_buffer_iterator_reset(struct ring_buffer *buf);
+
+#endif /* _LINUX_RING_BUFFER_ITERATOR_H */
Index: linux.trees.git/lib/ringbuffer/Makefile
===================================================================
--- linux.trees.git.orig/lib/ringbuffer/Makefile        2010-08-16 
16:09:23.000000000 -0400
+++ linux.trees.git/lib/ringbuffer/Makefile     2010-08-17 16:09:46.000000000 
-0400
@@ -1,5 +1,6 @@
 obj-y += ring_buffer_backend.o
 obj-y += ring_buffer_frontend.o
+obj-y += ring_buffer_iterator.o
 obj-y += ring_buffer_vfs.o
 obj-y += ring_buffer_splice.o
 obj-y += ring_buffer_mmap.o


_______________________________________________
ltt-dev mailing list
[email protected]
http://lists.casi.polymtl.ca/cgi-bin/mailman/listinfo/ltt-dev

Reply via email to