On Thu 2019-08-08 00:32:26, John Ogness wrote:
> --- /dev/null
> +++ b/kernel/printk/numlist.c
> @@ -0,0 +1,375 @@
> +// SPDX-License-Identifier: GPL-2.0
> +
> +#include <linux/sched.h>
> +#include "numlist.h"

struct numlist is really special variant of a list. Let me to
do a short summary:

   + FIFO queue interface

   + nodes sequentially numbered

   + nodes referenced by ID instead pointers to avoid ABA problems
     + requires custom node() callback to get pointer for given ID

   + lockless access:
     + pushed nodes must not longer get modified by push() caller
     + pop() caller gets exclusive write access, except that they
       must modify ID first and do smp_wmb() later

   + pop() does not work:
     + tail node is "busy"
        + needs a custom callback that defines when a node is busy
     + tail is the last node
        + needed for lockless sequential numbering

I will start with one inevitable question ;-) Is it realistic to find
another user for this API, please?

I am not sure that all the indirections, caused by the generic API,
are worth the gain.


Well, the separate API makes sense anyway. I have some ideas that
might make it cleaner.

The barriers are because of validating the ID. Now we have:

        struct nl_node {
                unsigned long   seq;
                unsigned long   next_id;
        };

that is used in:

        struct prb_desc {
                /* private */
                atomic_long_t           id;
                struct dr_desc          desc;
                struct nl_node          list;
        };

What will happen when we move id from struct prb_desc into struct nl_node?

        struct nl_node {
                unsigned long   seq;
                atomic_long_t   id;
                unsigned long   next_id;
        };

        struct prb_desc {
                struct dr_desc          desc;
                struct nl_node          list;
        };


Then the "node" callback might just return the structure. It makes
perfect sense. struct nl_node is always static for a given id.

For the printk ringbuffer it would look like:

struct nl_node *prb_nl_get_node(unsigned long id, void *nl_user)
{
        struct printk_ringbuffer *rb = (struct printk_ringbuffer *)nl_user;
        struct prb_desc *d = to_desc(rb, id);

        return &d->list;
}

I would also hide the callback behind a generic wrapper:

struct nl_node *numlist_get_node(struct numlist *nl, unsigned long id)
{
        return nl->get_node(id, nl->user_data);
}


Then we could have nicely symetric and self contained barriers
in numlist_read():

bool numlist_read(struct numlist *nl, unsigned long id, unsigned long *seq,
                  unsigned long *next_id)
{
        struct nl_node *n;
        unsigned long cur_id;

        n = numlist_get_node(nl, id);
        if (!n)
                return false;

        /*
         * Make sure that seq and next_id values will be read
         * for the expected id.
         */
        cur_id = atomic_long_read_acquire(&n->id);
        if (cur_id != id)
                return false;

        if (seq) {
                *seq = n->seq;

        if (next_id)
                *next_id = n->next_id;
        }

        /*
         * Make sure that seq and next_id values were read for
         * the expected ID.
         */
        cur_id = atomic_long_read_release(&n->id);

        return cur_id == id;
}

numlist_push() might be the same, except the I would
remove several WRITE_ONCE as discussed in another mail:

void numlist_push(struct numlist *nl, struct nl_node *n)
{
        unsigned long head_id;
        unsigned long seq;
        unsigned long r;

        /* Setup the node to be a list terminator: next_id == id. */
        n->next_id = n->id;

        do {
                do {
                        head_id = atomic_long_read(&nl->head_id);
                } while (!numlist_read(nl, head_id, &seq, NULL));

                n->seq = seq + 1;

                /*
                 * This store_release() guarantees that @seq and @next are
                 * stored before the node with @id is visible to any popping
                 * writers.
                 * 
                 * It pairs with the acquire() when tail_id gets updated
                 * in headlist_pop();
                 */
        } while (atomic_long_cmpxchg_release(&nl->head_id, head_id, id) !=
                        head_id);

        n = nl->get_node(nl, head_id);

        /*
         * This barrier makes sure that nl->head_id already points to
         * the newly pushed node.
         *
         * It pairs with acquire when new id is written in numlist_pop().
         * It allows to pop() and reuse this node. It can not longer
         * be the last one.
         */
        smp_store_release(&n->next_id, id);
}

Then I would add a symetric callback that would generate ID for
a newly popped struct. It will allow to set new ID in the numlist
API and have the barriers symetric. Something like:

unsined long prb_new_node_id(unsigned long old_id, , void *nl_user)
{
        struct printk_ringbuffer *rb = (struct printk_ringbuffer *)nl_user;

        return id + DESCS_COUNT(rb);
}

Then we could hide it in

unsigned long numlist_get_new_id(struct numlist *nl, unsigned long id)
{
        return nl->get_new_id(id, nl->user_data);
}

and do

struct nl_node *numlist_pop(struct numlist *nl)
{
        struct nl_node *n;
        unsigned long tail_id;
        unsigned long next_id;
        unsigned long r;

        tail_id = atomic_long_read(&nl->tail_id);

        do {
                do {
                        tail_id = atomic_long_read(&nl->tail_id);
                } while (!numlist_read(nl, tail_id, NULL, &next_id));

                /* Make sure the node is not the only node on the list. */
                if (next_id == tail_id)
                        return NULL;

                /* Make sure the node is not busy. */
                if (nl->busy(tail_id, nl->busy_arg))
                        return NULL;

                /*
                 * Make sure that nl->tail_id is update before
                 * we start modyfying the popped node.
                 *
                 * It pairs with release() when head_id is
                 * pushed in numlist_push().
                 */
        } while (atomic_long_cmpxchg_acquire(&nl->tail_id,
                                        tail_id, next_id) !=
                  tail_id);

        /* Got exclusive write access to the node. */
        n = numlist_get_node(nl, tail_id);

        tail_id = numlist_get_new_id(tail_id, nl);
        /*
         * Make sure that we set new ID before we allow
         * more changes in user structure handled by this node.
         *
         * It pairs with release() barrier when the node is
         * pushed into the numlist again, gets linked to
         * the previous node and can't be modified anymore.
         * See numlist_push().
         */
        atomic_long_set_acquire(&d->id, atomic_long_read(&d->id) +
                                DESCS_COUNT(rb));

        return n;
}

I hope that it makes some sense. I feel exhausted. It is Friday
evening here. I just wanted to send it because it looked like the most
constructive idea that I had this week. And I wanted to send something
more positive ;-)

Best Regards,
Petr

Reply via email to