On 11/01, Paul E. McKenney wrote: > > Any news on exactly which patch constituted the reworking of this > code some time back?
Again, I never sent a patch, I simply showed the new code (more than 2 years ago ;), see below. I need to re-read our discussiong, but iirc your and Peter's reviews were mostly positive. The new implementation (and the state machine) is simpler, plus the new __rcu_sync_enter(). It can be used instead of rcu_sync_enter_start() hack, and by freeze_super() which currently need 3 GP's to take 3 percpu rwsems. Oleg. ------------------------------------------------------------------------------- #include <linux/rcu_sync.h> #include <linux/sched.h> #ifdef CONFIG_PROVE_RCU #define __INIT_HELD(func) .held = func, #else #define __INIT_HELD(func) #endif static const struct { void (*call)(struct rcu_head *, void (*)(struct rcu_head *)); void (*wait)(void); // TODO: remove this, see the comment in dtor #ifdef CONFIG_PROVE_RCU int (*held)(void); #endif } gp_ops[] = { [RCU_SYNC] = { .call = call_rcu, .wait = rcu_barrier, __INIT_HELD(rcu_read_lock_held) }, [RCU_SCHED_SYNC] = { .call = call_rcu_sched, .wait = rcu_barrier_sched, __INIT_HELD(rcu_read_lock_sched_held) }, [RCU_BH_SYNC] = { .call = call_rcu_bh, .wait = rcu_barrier_bh, __INIT_HELD(rcu_read_lock_bh_held) }, }; #define rss_lock gp_wait.lock enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY }; #ifdef CONFIG_PROVE_RCU void rcu_sync_lockdep_assert(struct rcu_sync *rsp) { RCU_LOCKDEP_WARN(!gp_ops[rsp->gp_type].held(), "suspicious rcu_sync_is_idle() usage"); } #endif void rcu_sync_init(struct rcu_sync *rsp, enum rcu_sync_type type) { memset(rsp, 0, sizeof(*rsp)); init_waitqueue_head(&rsp->gp_wait); rsp->gp_type = type; } static void rcu_sync_func(struct rcu_head *rcu); static void rcu_sync_call(struct rcu_sync *rsp) { // TODO: THIS IS SUBOPTIMAL. We want to call it directly // if rcu_blocking_is_gp() == T, but it has might_sleep(). gp_ops[rsp->gp_type].call(&rsp->cb_head, rcu_sync_func); } static void rcu_sync_func(struct rcu_head *rcu) { struct rcu_sync *rsp = container_of(rcu, struct rcu_sync, cb_head); unsigned long flags; BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_state == GP_PASSED); spin_lock_irqsave(&rsp->rss_lock, flags); if (rsp->gp_count) { /* * We're at least a GP after the first __rcu_sync_enter(). */ rsp->gp_state = GP_PASSED; wake_up_locked(&rsp->gp_wait); } else if (rsp->gp_state == GP_REPLAY) { /* * A new rcu_sync_exit() has happened; requeue the callback * to catch a later GP. */ rsp->gp_state = GP_EXIT; rcu_sync_call(rsp); } else { /* * We're at least a GP after the last rcu_sync_exit(); * eveybody will now have observed the write side critical * section. Let 'em rip!. * * OR. ->gp_state can be still GP_ENTER if __rcu_sync_wait() * wasn't called after __rcu_sync_enter(), abort. */ rsp->gp_state = GP_IDLE; } spin_unlock_irqrestore(&rsp->rss_lock, flags); } bool __rcu_sync_enter(struct rcu_sync *rsp) { int gp_count, gp_state; spin_lock_irq(&rsp->rss_lock); gp_count = rsp->gp_count++; gp_state = rsp->gp_state; if (gp_state == GP_IDLE) { rsp->gp_state = GP_ENTER; rcu_sync_call(rsp); } spin_unlock_irq(&rsp->rss_lock); BUG_ON(gp_count != 0 && gp_state == GP_IDLE); BUG_ON(gp_count == 0 && gp_state == GP_PASSED); return gp_state < GP_PASSED; } void __rcu_sync_wait(struct rcu_sync *rsp) { BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_count == 0); wait_event(rsp->gp_wait, rsp->gp_state >= GP_PASSED); } void rcu_sync_enter(struct rcu_sync *rsp) { if (__rcu_sync_enter(rsp)) __rcu_sync_wait(rsp); } void rcu_sync_exit(struct rcu_sync *rsp) { BUG_ON(rsp->gp_state == GP_IDLE); BUG_ON(rsp->gp_count == 0); spin_lock_irq(&rsp->rss_lock); if (!--rsp->gp_count) { if (rsp->gp_state == GP_PASSED) { rsp->gp_state = GP_EXIT; rcu_sync_call(rsp); } else if (rsp->gp_state == GP_EXIT) { rsp->gp_state = GP_REPLAY; } } spin_unlock_irq(&rsp->rss_lock); } void rcu_sync_dtor(struct rcu_sync *rsp) { int gp_state; BUG_ON(rsp->gp_count); BUG_ON(rsp->gp_state == GP_PASSED); spin_lock_irq(&rsp->rss_lock); if (rsp->gp_state == GP_REPLAY) rsp->gp_state = GP_EXIT; gp_state = rsp->gp_state; spin_unlock_irq(&rsp->rss_lock); // TODO: add another wake_up_locked() into rcu_sync_func(), // use wait_event + spin_lock_wait, remove gp_ops->wait(). if (gp_state != GP_IDLE) { gp_ops[rsp->gp_type].wait(); BUG_ON(rsp->gp_state != GP_IDLE); } }