On 11/01, Paul E. McKenney wrote:
>
> Any news on exactly which patch constituted the reworking of this
> code some time back?

Again, I never sent a patch, I simply showed the new code (more than 2 years
ago ;), see below. I need to re-read our discussiong, but iirc your and Peter's
reviews were mostly positive.

The new implementation (and the state machine) is simpler, plus the new
__rcu_sync_enter(). It can be used instead of rcu_sync_enter_start() hack,
and by freeze_super() which currently need 3 GP's to take 3 percpu rwsems.

Oleg.
-------------------------------------------------------------------------------

#include <linux/rcu_sync.h>
#include <linux/sched.h>

#ifdef CONFIG_PROVE_RCU
#define __INIT_HELD(func)       .held = func,
#else
#define __INIT_HELD(func)
#endif

static const struct {
        void (*call)(struct rcu_head *, void (*)(struct rcu_head *));
        void (*wait)(void);     // TODO: remove this, see the comment in dtor
#ifdef CONFIG_PROVE_RCU
        int  (*held)(void);
#endif
} gp_ops[] = {
        [RCU_SYNC] = {
                .call = call_rcu,
                .wait = rcu_barrier,
                __INIT_HELD(rcu_read_lock_held)
        },
        [RCU_SCHED_SYNC] = {
                .call = call_rcu_sched,
                .wait = rcu_barrier_sched,
                __INIT_HELD(rcu_read_lock_sched_held)
        },
        [RCU_BH_SYNC] = {
                .call = call_rcu_bh,
                .wait = rcu_barrier_bh,
                __INIT_HELD(rcu_read_lock_bh_held)
        },
};

#define rss_lock        gp_wait.lock

enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY };

#ifdef CONFIG_PROVE_RCU
void rcu_sync_lockdep_assert(struct rcu_sync *rsp)
{
        RCU_LOCKDEP_WARN(!gp_ops[rsp->gp_type].held(),
                         "suspicious rcu_sync_is_idle() usage");
}
#endif

void rcu_sync_init(struct rcu_sync *rsp, enum rcu_sync_type type)
{
        memset(rsp, 0, sizeof(*rsp));
        init_waitqueue_head(&rsp->gp_wait);
        rsp->gp_type = type;
}

static void rcu_sync_func(struct rcu_head *rcu);

static void rcu_sync_call(struct rcu_sync *rsp)
{
        // TODO: THIS IS SUBOPTIMAL. We want to call it directly
        // if rcu_blocking_is_gp() == T, but it has might_sleep().
        gp_ops[rsp->gp_type].call(&rsp->cb_head, rcu_sync_func);
}

static void rcu_sync_func(struct rcu_head *rcu)
{
        struct rcu_sync *rsp = container_of(rcu, struct rcu_sync, cb_head);
        unsigned long flags;

        BUG_ON(rsp->gp_state == GP_IDLE);
        BUG_ON(rsp->gp_state == GP_PASSED);

        spin_lock_irqsave(&rsp->rss_lock, flags);
        if (rsp->gp_count) {
                /*
                 * We're at least a GP after the first __rcu_sync_enter().
                 */
                rsp->gp_state = GP_PASSED;
                wake_up_locked(&rsp->gp_wait);
        } else if (rsp->gp_state == GP_REPLAY) {
                /*
                 * A new rcu_sync_exit() has happened; requeue the callback
                 * to catch a later GP.
                 */
                rsp->gp_state = GP_EXIT;
                rcu_sync_call(rsp);
        } else {
                /*
                 * We're at least a GP after the last rcu_sync_exit();
                 * eveybody will now have observed the write side critical
                 * section. Let 'em rip!.
                 *
                 * OR. ->gp_state can be still GP_ENTER if __rcu_sync_wait()
                 * wasn't called after __rcu_sync_enter(), abort.
                 */
                rsp->gp_state = GP_IDLE;
        }
        spin_unlock_irqrestore(&rsp->rss_lock, flags);
}

bool __rcu_sync_enter(struct rcu_sync *rsp)
{
        int gp_count, gp_state;

        spin_lock_irq(&rsp->rss_lock);
        gp_count = rsp->gp_count++;
        gp_state = rsp->gp_state;
        if (gp_state == GP_IDLE) {
                rsp->gp_state = GP_ENTER;
                rcu_sync_call(rsp);
        }
        spin_unlock_irq(&rsp->rss_lock);

        BUG_ON(gp_count != 0 && gp_state == GP_IDLE);
        BUG_ON(gp_count == 0 && gp_state == GP_PASSED);

        return gp_state < GP_PASSED;
}

void __rcu_sync_wait(struct rcu_sync *rsp)
{
        BUG_ON(rsp->gp_state == GP_IDLE);
        BUG_ON(rsp->gp_count == 0);

        wait_event(rsp->gp_wait, rsp->gp_state >= GP_PASSED);
}

void rcu_sync_enter(struct rcu_sync *rsp)
{
        if (__rcu_sync_enter(rsp))
                __rcu_sync_wait(rsp);
}

void rcu_sync_exit(struct rcu_sync *rsp)
{
        BUG_ON(rsp->gp_state == GP_IDLE);
        BUG_ON(rsp->gp_count == 0);

        spin_lock_irq(&rsp->rss_lock);
        if (!--rsp->gp_count) {
                if (rsp->gp_state == GP_PASSED) {
                        rsp->gp_state = GP_EXIT;
                        rcu_sync_call(rsp);
                } else if (rsp->gp_state == GP_EXIT) {
                        rsp->gp_state = GP_REPLAY;
                }
        }
        spin_unlock_irq(&rsp->rss_lock);
}

void rcu_sync_dtor(struct rcu_sync *rsp)
{
        int gp_state;

        BUG_ON(rsp->gp_count);
        BUG_ON(rsp->gp_state == GP_PASSED);

        spin_lock_irq(&rsp->rss_lock);
        if (rsp->gp_state == GP_REPLAY)
                rsp->gp_state = GP_EXIT;
        gp_state = rsp->gp_state;
        spin_unlock_irq(&rsp->rss_lock);

        // TODO: add another wake_up_locked() into rcu_sync_func(),
        // use wait_event + spin_lock_wait, remove gp_ops->wait().
        if (gp_state != GP_IDLE) {
                gp_ops[rsp->gp_type].wait();
                BUG_ON(rsp->gp_state != GP_IDLE);
        }
}


Reply via email to