Add a new lockless ringbuffer implementation to be used for printk.
First only add support for writers. Reader support will be added in
a follow-up commit.

Signed-off-by: John Ogness <john.ogn...@linutronix.de>
---
 kernel/printk/printk_ringbuffer.c | 676 ++++++++++++++++++++++++++++++
 kernel/printk/printk_ringbuffer.h | 239 +++++++++++
 2 files changed, 915 insertions(+)
 create mode 100644 kernel/printk/printk_ringbuffer.c
 create mode 100644 kernel/printk/printk_ringbuffer.h

diff --git a/kernel/printk/printk_ringbuffer.c 
b/kernel/printk/printk_ringbuffer.c
new file mode 100644
index 000000000000..09c32e52fd40
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.c
@@ -0,0 +1,676 @@
+// SPDX-License-Identifier: GPL-2.0
+
+#include <linux/kernel.h>
+#include <linux/irqflags.h>
+#include <linux/string.h>
+#include <linux/errno.h>
+#include <linux/bug.h>
+#include "printk_ringbuffer.h"
+
+/**
+ * DOC: printk_ringbuffer overview
+ *
+ * Data Structure
+ * --------------
+ * The printk_ringbuffer is made up of 3 internal ringbuffers::
+ *
+ * * desc_ring:      A ring of descriptors. A descriptor contains all record
+ *                   meta-data (sequence number, timestamp, loglevel, etc.) as
+ *                   well as internal state information about the record and
+ *                   logical positions specifying where in the other
+ *                   ringbuffers the text and dictionary strings are located.
+ *
+ * * text_data_ring: A ring of data blocks. A data block consists of a 32-bit
+ *                   integer (ID) that maps to a desc_ring index followed by
+ *                   the text string of the record.
+ *
+ * * dict_data_ring: A ring of data blocks. A data block consists of a 32-bit
+ *                   integer (ID) that maps to a desc_ring index followed by
+ *                   the dictionary string of the record.
+ *
+ * Usage
+ * -----
+ * Here are some simple examples demonstrating writers and readers. For the
+ * examples it is assumed that a global ringbuffer is available::
+ *
+ *     DECLARE_PRINTKRB(rb, 15, 5, 4);
+ *
+ * This ringbuffer has a size of 1 MiB for text data (2 ^ 20), 512 KiB for
+ * dictionary data (2 ^ 19), and allows up to 32768 records (2 ^ 15).
+ *
+ * Sample writer code::
+ *
+ *     struct prb_reserved_entry e;
+ *     struct printk_record r;
+ *
+ *     // specify how much to allocate
+ *     r.text_buf_size = strlen(textstr) + 1;
+ *     r.dict_buf_size = strlen(dictstr) + 1;
+ *
+ *     if (prb_reserve(&e, &rb, &r)) {
+ *             snprintf(r.text_buf, r.text_buf_size, "%s", textstr);
+ *
+ *             // dictionary allocation may have failed
+ *             if (r.dict_buf)
+ *                     snprintf(r.dict_buf, r.dict_buf_size, "%s", dictstr);
+ *
+ *             r.info->ts_nsec = local_clock();
+ *
+ *             prb_commit(&e);
+ *     }
+ *
+ * Sample reader code::
+ *
+ *     struct printk_info info;
+ *     char text_buf[32];
+ *     char dict_buf[32];
+ *     u64 next_seq = 0;
+ *     struct printk_record r = {
+ *             .info           = &info,
+ *             .text_buf       = &text_buf[0],
+ *             .dict_buf       = &dict_buf[0],
+ *             .text_buf_size  = sizeof(text_buf),
+ *             .dict_buf_size  = sizeof(dict_buf),
+ *     };
+ *
+ *     while (prb_read_valid(&rb, next_seq, &r)) {
+ *             if (info.seq != next_seq)
+ *                     pr_warn("lost %llu records\n", info.seq - next_seq);
+ *
+ *             if (info.text_len > r.text_buf_size) {
+ *                     pr_warn("record %llu text truncated\n", info.seq);
+ *                     text_buf[sizeof(text_buf) - 1] = 0;
+ *             }
+ *
+ *             if (info.dict_len > r.dict_buf_size) {
+ *                     pr_warn("record %llu dict truncated\n", info.seq);
+ *                     dict_buf[sizeof(dict_buf) - 1] = 0;
+ *             }
+ *
+ *             pr_info("%llu: %llu: %s;%s\n", info.seq, info.ts_nsec,
+ *                     &text_buf[0], info.dict_len ? &dict_buf[0] : "");
+ *
+ *             next_seq = info.seq + 1;
+ *     }
+ */
+
+#define DATA_SIZE(data_ring)           _DATA_SIZE((data_ring)->size_bits)
+#define DATA_SIZE_MASK(data_ring)      (DATA_SIZE(data_ring) - 1)
+
+#define DESCS_COUNT(desc_ring)         _DESCS_COUNT((desc_ring)->count_bits)
+#define DESCS_COUNT_MASK(desc_ring)    (DESCS_COUNT(desc_ring) - 1)
+
+/* Determine the data array index from a logical position. */
+#define DATA_INDEX(data_ring, lpos)    ((lpos) & DATA_SIZE_MASK(data_ring))
+
+/* Determine the desc array index from an ID or sequence number. */
+#define DESC_INDEX(desc_ring, n)       ((n) & DESCS_COUNT_MASK(desc_ring))
+
+/* Determine how many times the data array has wrapped. */
+#define DATA_WRAPS(data_ring, lpos)    ((lpos) >> (data_ring)->size_bits)
+
+/* Get the logical position at index 0 of the current wrap. */
+#define DATA_THIS_WRAP_START_LPOS(data_ring, lpos) \
+       ((lpos) & ~DATA_SIZE_MASK(data_ring))
+
+/* Get the ID for the same index of the previous wrap as the given ID. */
+#define DESC_ID_PREV_WRAP(desc_ring, id) \
+       DESC_ID((id) - DESCS_COUNT(desc_ring))
+
+/* A data block: maps to the raw data within the data ring. */
+struct prb_data_block {
+       u32     id;
+       u8      data[0];
+};
+
+static struct prb_desc *to_desc(struct prb_desc_ring *desc_ring, u64 n)
+{
+       return &desc_ring->descs[DESC_INDEX(desc_ring, n)];
+}
+
+static struct prb_data_block *to_block(struct prb_data_ring *data_ring,
+                                      unsigned long begin_lpos)
+{
+       char *data = &data_ring->data[DATA_INDEX(data_ring, begin_lpos)];
+
+       return (struct prb_data_block *)data;
+}
+
+/* Increase the data size to account for data block meta data. */
+static unsigned long to_blk_size(unsigned long size)
+{
+       struct prb_data_block *db = NULL;
+
+       size += sizeof(*db);
+       size = ALIGN(size, sizeof(db->id));
+       return size;
+}
+
+/*
+ * Sanity checker for reserve size. The ringbuffer code assumes that a data
+ * block does not exceed the maximum possible size that could fit within the
+ * ringbuffer. This function provides that basic size check so that the
+ * assumption is safe.
+ */
+static bool data_check_size(struct prb_data_ring *data_ring, unsigned int size)
+{
+       struct prb_data_block *db = NULL;
+
+       /* Writers are not allowed to write data-less records. */
+       if (size == 0)
+               return false;
+
+       /*
+        * Ensure the alignment padded size could possibly fit in the data
+        * array. The largest possible data block must still leave room for
+        * at least the ID of the next block.
+        */
+       size = to_blk_size(size);
+       if (size > DATA_SIZE(data_ring) - sizeof(db->id))
+               return false;
+
+       return true;
+}
+
+/* The possible responses of a descriptor state-query. */
+enum desc_state {
+       desc_miss,      /* ID mismatch */
+       desc_reserved,  /* reserved, but still in use by writer */
+       desc_committed, /* committed, writer is done */
+       desc_reusable,  /* free, not used by any writer */
+};
+
+/* Query the state of a descriptor. */
+static enum desc_state get_desc_state(u32 id, u32 state_val)
+{
+       if (id != DESC_ID(state_val))
+               return desc_miss;
+
+       if (state_val & DESC_REUSE_MASK)
+               return desc_reusable;
+
+       if (state_val & DESC_COMMITTED_MASK)
+               return desc_committed;
+
+       return desc_reserved;
+}
+
+/* Get a copy of a specified descriptor and its state. */
+static enum desc_state desc_read(struct prb_desc_ring *desc_ring, u32 id,
+                                struct prb_desc *desc_out)
+{
+       struct prb_desc *desc = to_desc(desc_ring, id);
+       atomic_t *state_var = &desc->state_var;
+       enum desc_state d_state;
+       u32 state_val;
+
+       state_val = atomic_read(state_var);
+       d_state = get_desc_state(id, state_val);
+       if (d_state == desc_miss || d_state == desc_reserved)
+               return d_state;
+
+       /* Check state before loading data. */
+       smp_rmb(); /* matches LMM_REF(prb_commit:A) */
+
+       *desc_out = READ_ONCE(*desc);
+
+       /* Load data before re-checking state. */
+       smp_rmb(); /* matches LMM_REF(desc_reserve:A) */
+
+       state_val = atomic_read(state_var);
+       return get_desc_state(id, state_val);
+}
+
+/*
+ * Take a given descriptor out of the committed state by attempting
+ * the transition: committed->reusable. Either this task or some
+ * other task will have been successful.
+ */
+static void desc_make_reusable(struct prb_desc_ring *desc_ring, u32 id)
+{
+       struct prb_desc *desc = to_desc(desc_ring, id);
+       atomic_t *state_var = &desc->state_var;
+       u32 val_committed = id | DESC_COMMITTED_MASK;
+       u32 val_reusable = val_committed | DESC_REUSE_MASK;
+
+       atomic_cmpxchg(state_var, val_committed, val_reusable);
+}
+
+/*
+ * For a given data ring (text or dict) and its current tail lpos:
+ * for each data block up until @lpos, make the associated descriptor
+ * reusable.
+ *
+ * If there is any problem making the associated descriptor reusable,
+ * either the descriptor has not yet been committed or another writer
+ * task has already pushed the tail lpos past the problematic data
+ * block. Regardless, on error the caller can re-load the tail lpos
+ * to determine the situation.
+ */
+static bool data_make_reusable(struct printk_ringbuffer *rb,
+                              struct prb_data_ring *data_ring,
+                              unsigned long tail_lpos, unsigned long lpos,
+                              unsigned long *lpos_out)
+{
+       struct prb_desc_ring *desc_ring = &rb->desc_ring;
+       struct prb_data_blk_lpos *blk_lpos;
+       struct prb_data_block *blk;
+       enum desc_state d_state;
+       struct prb_desc desc;
+       u32 id;
+
+       /*
+        * Point @blk_lpos to the correct blk_lpos with the local copy
+        * of the descriptor based on the provided @data_ring.
+        */
+       if (data_ring == &rb->text_data_ring)
+               blk_lpos = &desc.text_blk_lpos;
+       else
+               blk_lpos = &desc.dict_blk_lpos;
+
+       /* Loop until @tail_lpos has advanced to or beyond @lpos. */
+       while ((lpos - tail_lpos) - 1 < DATA_SIZE(data_ring)) {
+               blk = to_block(data_ring, tail_lpos);
+               id = READ_ONCE(blk->id);
+
+               d_state = desc_read(desc_ring, id, &desc);
+
+               switch (d_state) {
+               case desc_miss:
+                       return false;
+               case desc_reserved:
+                       return false;
+               case desc_committed:
+                       /* Does the descriptor link to this data block? */
+                       if (blk_lpos->begin != tail_lpos)
+                               return false;
+                       desc_make_reusable(desc_ring, id);
+                       break;
+               case desc_reusable:
+                       /* Does the descriptor link to this data block? */
+                       if (blk_lpos->begin != tail_lpos)
+                               return false;
+                       break;
+               }
+
+               /* Advance @tail_lpos to the next data block. */
+               tail_lpos = blk_lpos->next;
+       }
+
+       *lpos_out = tail_lpos;
+
+       return true;
+}
+
+/*
+ * Advance the data ring tail to at least @lpos. This function puts all
+ * descriptors into the reusable state if the tail will be pushed beyond their
+ * associated data block.
+ */
+static bool data_push_tail(struct printk_ringbuffer *rb,
+                          struct prb_data_ring *data_ring, unsigned long lpos)
+{
+       unsigned long tail_lpos;
+       unsigned long next_lpos;
+
+       /* If @lpos is not valid, there is nothing to do. */
+       if (lpos == INVALID_LPOS)
+               return true;
+
+       tail_lpos = atomic_long_read(&data_ring->tail_lpos);
+
+       do {
+               /* If @lpos is no longer valid, there is nothing to do. */
+               if (lpos - tail_lpos >= DATA_SIZE(data_ring))
+                       break;
+
+               /*
+                * Make all descriptors reusable that are associated with
+                * data blocks before @lpos.
+                */
+               if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
+                                       &next_lpos)) {
+                       /*
+                        * data_make_reusable() performed state loads. Make
+                        * sure they are done before reloading the tail lpos.
+                        */
+                       smp_rmb(); /* matches LMM_REF(data_push_tail:A) */
+
+                       /*
+                        * Failure may be due to another task pushing the
+                        * tail. Reload the tail and check.
+                        */
+                       next_lpos = atomic_long_read(&data_ring->tail_lpos);
+                       if (next_lpos == tail_lpos)
+                               return false;
+
+                       /* Another task pushed the tail. Try again. */
+                       tail_lpos = next_lpos;
+                       continue;
+               }
+       /*
+        * Advance the tail lpos (invalidating the data block) before
+        * allowing future changes (such as the recycling of the
+        * associated descriptor).
+        */
+       } while (!atomic_long_try_cmpxchg(&data_ring->tail_lpos, &tail_lpos,
+                               next_lpos)); /* LMM_TAG(data_push_tail:A) */
+
+       return true;
+}
+
+/*
+ * Advance the desc ring tail. This function advances the tail by one
+ * descriptor, thus invalidating the oldest descriptor. Before advancing
+ * the tail, the tail descriptor is made reusable and all data blocks up to
+ * and including the descriptor's data block are invalidated (i.e. the data
+ * ring tail is pushed past the data block of the descriptor being made
+ * reusable).
+ */
+static bool desc_push_tail(struct printk_ringbuffer *rb, u32 tail_id)
+{
+       struct prb_desc_ring *desc_ring = &rb->desc_ring;
+       enum desc_state d_state;
+       struct prb_desc desc;
+
+       d_state = desc_read(desc_ring, tail_id, &desc);
+
+       switch (d_state) {
+       case desc_miss:
+               /*
+                * If the ID is exactly 1 wrap behind the expected, it is
+                * being reserved by another writer and cannot be invalidated.
+                */
+               if (DESC_ID(atomic_read(&desc.state_var)) ==
+                   DESC_ID_PREV_WRAP(desc_ring, tail_id)) {
+                       return false;
+               }
+               return true;
+       case desc_reserved:
+               return false;
+       case desc_committed:
+               desc_make_reusable(desc_ring, tail_id);
+               break;
+       case desc_reusable:
+               break;
+       }
+
+       /*
+        * Data blocks must be invalidated before their associated
+        * descriptor can be made available for recycling. Invalidating
+        * them later is not possible because there is no way to trust
+        * data blocks once their associated descriptor is gone.
+        */
+
+       if (!data_push_tail(rb, &rb->text_data_ring, desc.text_blk_lpos.next))
+               return false;
+       if (!data_push_tail(rb, &rb->dict_data_ring, desc.dict_blk_lpos.next))
+               return false;
+
+       /*
+        * Check the next descriptor after @tail_id before pushing the tail to
+        * it because the tail must always be in a committed or reusable
+        * state. The implementation of get_desc_tail_seq() relies on this.
+        *
+        * A successful read also implies that the next descriptor <= @head_id
+        * so there is no risk of pushing the tail past the head.
+        */
+       d_state = desc_read(desc_ring, DESC_ID(tail_id + 1), &desc);
+       if (d_state == desc_committed || d_state == desc_reusable) {
+               atomic_cmpxchg(&desc_ring->tail_id, tail_id,
+                              DESC_ID(tail_id + 1));
+       } else {
+               /*
+                * The descriptor following @tail_id is not in an allowed
+                * tail state. But if the tail has since been moved by
+                * another task already, then it does not matter.
+                */
+               if (atomic_read(&desc_ring->tail_id) == tail_id)
+                       return false;
+       }
+
+       return true;
+}
+
+/* Reserve a new descriptor, invalidating the oldest if necessary. */
+static bool desc_reserve(struct printk_ringbuffer *rb, u32 *id_out)
+{
+       struct prb_desc_ring *desc_ring = &rb->desc_ring;
+       struct prb_desc *desc;
+       u32 id_prev_wrap;
+       u32 head_id;
+       u32 id;
+
+       head_id = atomic_read(&desc_ring->head_id);
+
+       do {
+               desc = to_desc(desc_ring, head_id);
+
+               id = DESC_ID(head_id + 1);
+               id_prev_wrap = DESC_ID_PREV_WRAP(desc_ring, id);
+
+               if (id_prev_wrap == atomic_read(&desc_ring->tail_id)) {
+                       if (!desc_push_tail(rb, id_prev_wrap))
+                               return false;
+               }
+       } while (!atomic_try_cmpxchg(&desc_ring->head_id, &head_id, id));
+
+       desc = to_desc(desc_ring, id);
+
+       /* Set the descriptor's ID and also set its state to reserved. */
+       atomic_set(&desc->state_var, id | 0);
+
+       /* Store new ID/state before making any other changes. */
+       smp_wmb(); /* LMM_TAG(desc_reserve:A) */
+
+       *id_out = id;
+       return true;
+}
+
+/* Determine the end of a data block. */
+static unsigned long get_next_lpos(struct prb_data_ring *data_ring,
+                                  unsigned long lpos, unsigned int size)
+{
+       unsigned long begin_lpos;
+       unsigned long next_lpos;
+
+       begin_lpos = lpos;
+       next_lpos = lpos + size;
+
+       if (DATA_WRAPS(data_ring, begin_lpos) ==
+           DATA_WRAPS(data_ring, next_lpos)) {
+               /* The data block does not wrap. */
+               return next_lpos;
+       }
+
+       /* Wrapping data blocks store their data at the beginning. */
+       return (DATA_THIS_WRAP_START_LPOS(data_ring, next_lpos) + size);
+}
+
+/*
+ * Allocate a new data block, invalidating the oldest data block(s)
+ * if necessary. This function also associates the data block with
+ * a specified descriptor.
+ */
+static char *data_alloc(struct printk_ringbuffer *rb,
+                       struct prb_data_ring *data_ring, unsigned long size,
+                       struct prb_data_blk_lpos *blk_lpos, u32 id)
+{
+       struct prb_data_block *blk;
+       unsigned long begin_lpos;
+       unsigned long next_lpos;
+
+       if (size == 0) {
+               /* Specify a data-less block. */
+               blk_lpos->begin = INVALID_LPOS;
+               blk_lpos->next = INVALID_LPOS;
+               return NULL;
+       }
+
+       size = to_blk_size(size);
+
+       begin_lpos = atomic_long_read(&data_ring->head_lpos);
+
+       do {
+               next_lpos = get_next_lpos(data_ring, begin_lpos, size);
+
+               if (!data_push_tail(rb, data_ring,
+                                   next_lpos - DATA_SIZE(data_ring))) {
+                       /* Failed to allocate, specify a data-less block. */
+                       blk_lpos->begin = INVALID_LPOS;
+                       blk_lpos->next = INVALID_LPOS;
+                       return NULL;
+               }
+       } while (!atomic_long_try_cmpxchg(&data_ring->head_lpos, &begin_lpos,
+                                         next_lpos));
+
+       /*
+        * No barrier is needed here. The data validity is defined by
+        * the state of the associated descriptor. They are marked as
+        * invalid at the moment. And only the winner of the above
+        * cmpxchg() could write here.
+        */
+
+       blk = to_block(data_ring, begin_lpos);
+       blk->id = id;
+
+       if (DATA_WRAPS(data_ring, begin_lpos) !=
+           DATA_WRAPS(data_ring, next_lpos)) {
+               /* Wrapping data blocks store their data at the beginning. */
+               blk = to_block(data_ring, 0);
+               blk->id = id;
+       }
+
+       blk_lpos->begin = begin_lpos;
+       blk_lpos->next = next_lpos;
+
+       return &blk->data[0];
+}
+
+/**
+ * prb_reserve() - Reserve space in the ringbuffer.
+ *
+ * @e:  The entry structure to setup.
+ * @rb: The ringbuffer to reserve data in.
+ * @r:  The record structure to allocate buffers for.
+ *
+ * This is the public function available to writers to reserve data.
+ *
+ * The writer specifies the text and dict sizes to reserve by setting the
+ * @text_buf_size and @dict_buf_size fields of @r, respectively. Dictionaries
+ * are optional, so @dict_buf_size is allowed to be 0.
+ *
+ * Context: Any context. Disables local interrupts on success.
+ * Return: true if at least text data could be allocated, otherwise false.
+ *
+ * On success, the fields @info, @text_buf, @dict_buf of @r will be set by
+ * this function and should be filled in by the writer before committing.
+ *
+ * If the function fails to reserve dictionary space (but all else succeeded),
+ * it will still report success. In that case @dict_buf is set to NULL and
+ * @dict_buf_size is set to 0. Writers must check this before writing to
+ * dictionary space.
+ */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+                struct printk_record *r)
+{
+       struct prb_desc_ring *desc_ring = &rb->desc_ring;
+       struct prb_desc *d;
+       u32 id;
+
+       if (!data_check_size(&rb->text_data_ring, r->text_buf_size))
+               return false;
+
+       /* Records without dictionaries are allowed. */
+       if (r->dict_buf_size) {
+               if (!data_check_size(&rb->dict_data_ring, r->dict_buf_size))
+                       return false;
+       }
+
+       /* Disable interrupts during the reserve/commit window. */
+       local_irq_save(e->irqflags);
+
+       if (!desc_reserve(rb, &id)) {
+               /* Descriptor reservation failures are tracked. */
+               atomic_long_inc(&rb->fail);
+               local_irq_restore(e->irqflags);
+               return false;
+       }
+
+       d = to_desc(desc_ring, id);
+
+       /*
+        * Set the @e fields here so that prb_commit() can be used if
+        * text data allocation fails.
+        */
+       e->rb = rb;
+       e->id = id;
+
+       /*
+        * Initialize the sequence number if it has never been set.
+        * Otherwise just increment it by a full wrap.
+        *
+        * @seq is considered "never been set" if it has a value of 0,
+        * _except_ for descs[0], which was set by the ringbuffer initializer
+        * and therefore is always considered as set.
+        *
+        * See the "Bootstrap" comment block in printk_ringbuffer.h for
+        * details about how the initializer bootstraps the descriptors.
+        */
+       if (d->info.seq == 0 && DESC_INDEX(desc_ring, id) != 0)
+               d->info.seq = DESC_INDEX(desc_ring, id);
+       else
+               d->info.seq += DESCS_COUNT(desc_ring);
+
+       r->text_buf = data_alloc(rb, &rb->text_data_ring, r->text_buf_size,
+                                &d->text_blk_lpos, id);
+       /* If text data allocation fails, a data-less record is committed. */
+       if (r->text_buf_size && !r->text_buf) {
+               d->info.text_len = 0;
+               d->info.dict_len = 0;
+               prb_commit(e);
+               return false;
+       }
+
+       r->dict_buf = data_alloc(rb, &rb->dict_data_ring, r->dict_buf_size,
+                                &d->dict_blk_lpos, id);
+       /*
+        * If dict data allocation fails, the caller can still commit
+        * text. But dictionary information will not be available.
+        */
+       if (r->dict_buf_size && !r->dict_buf)
+               r->dict_buf_size = 0;
+
+       r->info = &d->info;
+
+       /* Set default values for the lengths. */
+       d->info.text_len = r->text_buf_size;
+       d->info.dict_len = r->dict_buf_size;
+
+       return true;
+}
+EXPORT_SYMBOL(prb_reserve);
+
+/**
+ * prb_commit() - Commit (previously reserved) data to the ringbuffer.
+ *
+ * @e: The entry containing the reserved data information.
+ *
+ * This is the public function available to writers to commit data.
+ *
+ * Context: Any context. Enables local interrupts.
+ */
+void prb_commit(struct prb_reserved_entry *e)
+{
+       struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
+       struct prb_desc *d = to_desc(desc_ring, e->id);
+
+       /* Store all record data before updating the state. */
+       smp_wmb(); /* LMM_TAG(prb_commit:A) */
+
+       atomic_set(&d->state_var, e->id | DESC_COMMITTED_MASK);
+
+       /* Restore interrupts, the reserve/commit window is finished. */
+       local_irq_restore(e->irqflags);
+}
+EXPORT_SYMBOL(prb_commit);
diff --git a/kernel/printk/printk_ringbuffer.h 
b/kernel/printk/printk_ringbuffer.h
new file mode 100644
index 000000000000..b6f06e5edc1b
--- /dev/null
+++ b/kernel/printk/printk_ringbuffer.h
@@ -0,0 +1,239 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+
+#ifndef _KERNEL_PRINTK_RINGBUFFER_H
+#define _KERNEL_PRINTK_RINGBUFFER_H
+
+#include <linux/atomic.h>
+
+struct printk_info {
+       u64     seq;            /* sequence number */
+       u64     ts_nsec;        /* timestamp in nanoseconds */
+       u16     text_len;       /* length of entire record */
+       u16     dict_len;       /* length of dictionary buffer */
+       u8      facility;       /* syslog facility */
+       u8      flags:5;        /* internal record flags */
+       u8      level:3;        /* syslog level */
+       u32     caller_id;      /* thread id or processor id */
+};
+
+/*
+ * A structure providing the buffers, used by writers.
+ *
+ * Writers:
+ * The writer sets @text_buf_size and @dict_buf_size before calling
+ * prb_reserve(). On success, prb_reserve() sets @info, @text_buf, @dict_buf.
+ */
+struct printk_record {
+       struct printk_info      *info;
+       char                    *text_buf;
+       char                    *dict_buf;
+       unsigned int            text_buf_size;
+       unsigned int            dict_buf_size;
+};
+
+/* Specifies the position/span of a data block. */
+struct prb_data_blk_lpos {
+       unsigned long   begin;
+       unsigned long   next;
+};
+
+/* A descriptor: the complete meta-data for a record. */
+struct prb_desc {
+       struct printk_info              info;
+       atomic_t                        state_var;
+       struct prb_data_blk_lpos        text_blk_lpos;
+       struct prb_data_blk_lpos        dict_blk_lpos;
+};
+
+/* A ringbuffer of "struct prb_data_block + data" elements. */
+struct prb_data_ring {
+       unsigned int    size_bits;
+       char            *data;
+       atomic_long_t   head_lpos;
+       atomic_long_t   tail_lpos;
+};
+
+/* A ringbuffer of "struct prb_desc" elements. */
+struct prb_desc_ring {
+       unsigned int            count_bits;
+       struct prb_desc         *descs;
+       atomic_t                head_id;
+       atomic_t                tail_id;
+};
+
+/* The high level structure representing the printk ringbuffer. */
+struct printk_ringbuffer {
+       struct prb_desc_ring    desc_ring;
+       struct prb_data_ring    text_data_ring;
+       struct prb_data_ring    dict_data_ring;
+       atomic_long_t           fail;
+};
+
+/* Used by writers as a reserve/commit handle. */
+struct prb_reserved_entry {
+       struct printk_ringbuffer        *rb;
+       unsigned long                   irqflags;
+       u32                             id;
+};
+
+#define _DATA_SIZE(sz_bits)            (1UL << (sz_bits))
+#define _DESCS_COUNT(ct_bits)          (1U << (ct_bits))
+#define DESC_SV_BITS                   (sizeof(int) * 8)
+#define DESC_COMMITTED_MASK            (1U << (DESC_SV_BITS - 1))
+#define DESC_REUSE_MASK                        (1U << (DESC_SV_BITS - 2))
+#define DESC_FLAGS_MASK                        (DESC_COMMITTED_MASK | 
DESC_REUSE_MASK)
+#define DESC_ID_MASK                   (~DESC_FLAGS_MASK)
+#define DESC_ID(sv)                    ((sv) & DESC_ID_MASK)
+#define INVALID_LPOS                   1
+
+#define INVALID_BLK_LPOS       \
+       {                       \
+               .begin  = INVALID_LPOS, \
+               .next   = INVALID_LPOS, \
+       }
+
+/*
+ * Descriptor Bootstrap
+ *
+ * The descriptor array is minimally initialized to allow immediate usage
+ * by readers and writers. The requirements that the descriptor array
+ * initialization must satisfy:
+ *
+ * R1: The tail must point to an existing (committed or reusable) descriptor.
+ *     This is required by the implementation of get_desc_tail_seq().
+ *
+ * R2: Readers must see that the ringbuffer is initially empty.
+ *
+ * R3: The first record reserved by a writer is assigned sequence number 0.
+ *
+ * To satisfy R1, the tail points to a descriptor that is minimally
+ * initialized (having no data block, i.e. data block's lpos @begin and @next
+ * values are set to INVALID_LPOS).
+ *
+ * To satisfy R2, the tail descriptor is initialized to the reusable state.
+ * Readers recognize reusable descriptors as existing records, but skip over
+ * them.
+ *
+ * To satisfy R3, the last descriptor in the array is used as the initial head
+ * (and tail) descriptor. This allows the first record reserved by a writer
+ * (head + 1) to be the first descriptor in the array. (Only the first
+ * descriptor in the array could have a valid sequence number of 0.)
+ *
+ * The first time a descriptor is reserved, it is assigned a sequence number
+ * with the value of the array index. A "first time reserved" descriptor can
+ * be recognized because it has a sequence number of 0 even though it does not
+ * have an index of 0. (Only the first descriptor in the array could have a
+ * valid sequence number of 0.) After the first reservation, all future
+ * reservations simply involve incrementing the sequence number by the array
+ * count.
+ *
+ * Hack #1:
+ * The first descriptor in the array is allowed to have a sequence number 0.
+ * In this case it is not possible to recognize if it is being reserved the
+ * first time (set to index value) or has been reserved previously (increment
+ * by the the array count)? This is handled by _always_ incrementing the
+ * sequence number when reserving the first descriptor in the array. So in
+ * order to satisfy R3, the sequence number of the first descriptor in the
+ * array is initialized to minus the array count. Then, upon the first
+ * reservation, it is incremented to 0.
+ *
+ * Hack #2:
+ * get_desc_tail_seq() can be called at any time by readers to retrieve the
+ * sequence number of the tail descriptor. However, due to R2 and R3,
+ * initially there are no records to report the sequence number of (sequence
+ * numbers are u64 and there is nothing less than 0). To handle this, the
+ * sequence number of the tail descriptor is initialized to 0. Technically
+ * this is incorrect, because there is no record with sequence number 0 (yet)
+ * and the tail descriptor is not the first descriptor in the array. But it
+ * allows prb_read_valid() to correctly report that the record is
+ * non-existent for any given sequence number. Bootstrapping is complete when
+ * the tail is pushed the first time, thus finally pointing to the first
+ * descriptor reserved by a writer, which has the assigned sequence number 0.
+ */
+
+/*
+ * Initiating Logical Value Overflows
+ *
+ * Both logical position (lpos) and ID values can be mapped to array indexes
+ * but may experience overflows during the lifetime of the system. To ensure
+ * that printk_ringbuffer can handle the overflows for these types, initial
+ * values are chosen that map to the correct initial array indexes, but will
+ * result in overflows soon.
+ *
+ * BLK0_LPOS: The initial @head_lpos and @tail_lpos for data rings.
+ *            It is at index 0 and the lpos value is such that it
+ *            will overflow on the first wrap.
+ *
+ * DESC0_ID: The initial @head_id and @tail_id for the desc ring. It is
+ *           at the last index of the descriptor array and the ID value
+ *           is such that it will overflow on the second wrap.
+ */
+#define BLK0_LPOS(sz_bits)     (-(_DATA_SIZE(sz_bits)))
+#define DESC0_ID(ct_bits)      DESC_ID(-(_DESCS_COUNT(ct_bits) + 1))
+#define DESC0_SV(ct_bits)      (DESC_COMMITTED_MASK | DESC_REUSE_MASK | \
+                                       DESC0_ID(ct_bits))
+
+#define DECLARE_PRINTKRB(name, descbits, avgtextbits, avgdictbits)     \
+char _##name##_text[1U << ((avgtextbits) + (descbits))]                        
\
+       __aligned(__alignof__(int));                                    \
+char _##name##_dict[1U << ((avgdictbits) + (descbits))]                        
\
+       __aligned(__alignof__(int));                                    \
+struct prb_desc _##name##_descs[_DESCS_COUNT(descbits)] = {            \
+       /* this will be the first record reserved by a writer */        \
+       [0] = {                                                         \
+                       .info = {                                       \
+                               /*
+                                * will be incremented to 0 on
+                                * the first reservation
+                                */                                     \
+                               .seq = -(u64)_DESCS_COUNT(descbits),    \
+                               },                                      \
+               },                                                      \
+       /* the initial head and tail */                                 \
+       [_DESCS_COUNT(descbits) - 1] = {                                \
+                       .info = {                                       \
+                               /*
+                                * reports the minimal seq value
+                                * during the bootstrap phase
+                                */                                     \
+                               .seq = 0,                               \
+                               },                                      \
+                       /* reusable */                                  \
+                       .state_var      = ATOMIC_INIT(DESC0_SV(descbits)), \
+                       /* no associated data block */                  \
+                       .text_blk_lpos  = INVALID_BLK_LPOS,             \
+                       .dict_blk_lpos  = INVALID_BLK_LPOS,             \
+               },                                                      \
+       };                                                              \
+struct printk_ringbuffer name = {                                      \
+       .desc_ring = {                                                  \
+               .count_bits     = descbits,                             \
+               .descs          = &_##name##_descs[0],                  \
+               .head_id        = ATOMIC_INIT(DESC0_ID(descbits)),      \
+               .tail_id        = ATOMIC_INIT(DESC0_ID(descbits)),      \
+       },                                                              \
+       .text_data_ring = {                                             \
+               .size_bits      = (avgtextbits) + (descbits),           \
+               .data           = &_##name##_text[0],                   \
+               .head_lpos      = ATOMIC_LONG_INIT(BLK0_LPOS(           \
+                                       (avgtextbits) + (descbits))),   \
+               .tail_lpos      = ATOMIC_LONG_INIT(BLK0_LPOS(           \
+                                       (avgtextbits) + (descbits))),   \
+       },                                                              \
+       .dict_data_ring = {                                             \
+               .size_bits      = (avgtextbits) + (descbits),           \
+               .data           = &_##name##_dict[0],                   \
+               .head_lpos      = ATOMIC_LONG_INIT(BLK0_LPOS(           \
+                                       (avgtextbits) + (descbits))),   \
+               .tail_lpos      = ATOMIC_LONG_INIT(BLK0_LPOS(           \
+                                       (avgtextbits) + (descbits))),   \
+       },                                                              \
+       .fail                   = ATOMIC_LONG_INIT(0),                  \
+}
+
+/* Writer Interface */
+bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
+                struct printk_record *r);
+void prb_commit(struct prb_reserved_entry *e);
+
+#endif /* _KERNEL_PRINTK_RINGBUFFER_H */
-- 
2.20.1


_______________________________________________
kexec mailing list
kexec@lists.infradead.org
http://lists.infradead.org/mailman/listinfo/kexec

Reply via email to