On Thu, Nov 01, 2018 at 04:45:19PM +0100, Oleg Nesterov wrote: > On 11/01, Paul E. McKenney wrote: > > > > Any news on exactly which patch constituted the reworking of this > > code some time back? > > Again, I never sent a patch, I simply showed the new code (more than 2 years > ago ;), see below. I need to re-read our discussiong, but iirc your and > Peter's > reviews were mostly positive.
I am glad that I didn't try to apply the various related patches I found, then. ;-) > The new implementation (and the state machine) is simpler, plus the new > __rcu_sync_enter(). It can be used instead of rcu_sync_enter_start() hack, > and by freeze_super() which currently need 3 GP's to take 3 percpu rwsems. > > Oleg. > ------------------------------------------------------------------------------- > > #include <linux/rcu_sync.h> > #include <linux/sched.h> > > #ifdef CONFIG_PROVE_RCU > #define __INIT_HELD(func) .held = func, > #else > #define __INIT_HELD(func) > #endif > > static const struct { > void (*call)(struct rcu_head *, void (*)(struct rcu_head *)); > void (*wait)(void); // TODO: remove this, see the comment in dtor > #ifdef CONFIG_PROVE_RCU > int (*held)(void); > #endif > } gp_ops[] = { > [RCU_SYNC] = { > .call = call_rcu, > .wait = rcu_barrier, > __INIT_HELD(rcu_read_lock_held) > }, > [RCU_SCHED_SYNC] = { > .call = call_rcu_sched, > .wait = rcu_barrier_sched, In my -rcu tree, these are now call_rcu and rcu_barrier, courtesy of RCU flavor consolidation. > __INIT_HELD(rcu_read_lock_sched_held) This remains the same. > }, > [RCU_BH_SYNC] = { > .call = call_rcu_bh, > .wait = rcu_barrier_bh, > __INIT_HELD(rcu_read_lock_bh_held) And same for these three. Thanx, Paul > }, > }; > > #define rss_lock gp_wait.lock > > enum { GP_IDLE = 0, GP_ENTER, GP_PASSED, GP_EXIT, GP_REPLAY }; > > #ifdef CONFIG_PROVE_RCU > void rcu_sync_lockdep_assert(struct rcu_sync *rsp) > { > RCU_LOCKDEP_WARN(!gp_ops[rsp->gp_type].held(), > "suspicious rcu_sync_is_idle() usage"); > } > #endif > > void rcu_sync_init(struct rcu_sync *rsp, enum rcu_sync_type type) > { > memset(rsp, 0, sizeof(*rsp)); > init_waitqueue_head(&rsp->gp_wait); > rsp->gp_type = type; > } > > static void rcu_sync_func(struct rcu_head *rcu); > > static void rcu_sync_call(struct rcu_sync *rsp) > { > // TODO: THIS IS SUBOPTIMAL. We want to call it directly > // if rcu_blocking_is_gp() == T, but it has might_sleep(). > gp_ops[rsp->gp_type].call(&rsp->cb_head, rcu_sync_func); > } > > static void rcu_sync_func(struct rcu_head *rcu) > { > struct rcu_sync *rsp = container_of(rcu, struct rcu_sync, cb_head); > unsigned long flags; > > BUG_ON(rsp->gp_state == GP_IDLE); > BUG_ON(rsp->gp_state == GP_PASSED); > > spin_lock_irqsave(&rsp->rss_lock, flags); > if (rsp->gp_count) { > /* > * We're at least a GP after the first __rcu_sync_enter(). > */ > rsp->gp_state = GP_PASSED; > wake_up_locked(&rsp->gp_wait); > } else if (rsp->gp_state == GP_REPLAY) { > /* > * A new rcu_sync_exit() has happened; requeue the callback > * to catch a later GP. > */ > rsp->gp_state = GP_EXIT; > rcu_sync_call(rsp); > } else { > /* > * We're at least a GP after the last rcu_sync_exit(); > * eveybody will now have observed the write side critical > * section. Let 'em rip!. > * > * OR. ->gp_state can be still GP_ENTER if __rcu_sync_wait() > * wasn't called after __rcu_sync_enter(), abort. > */ > rsp->gp_state = GP_IDLE; > } > spin_unlock_irqrestore(&rsp->rss_lock, flags); > } > > bool __rcu_sync_enter(struct rcu_sync *rsp) > { > int gp_count, gp_state; > > spin_lock_irq(&rsp->rss_lock); > gp_count = rsp->gp_count++; > gp_state = rsp->gp_state; > if (gp_state == GP_IDLE) { > rsp->gp_state = GP_ENTER; > rcu_sync_call(rsp); > } > spin_unlock_irq(&rsp->rss_lock); > > BUG_ON(gp_count != 0 && gp_state == GP_IDLE); > BUG_ON(gp_count == 0 && gp_state == GP_PASSED); > > return gp_state < GP_PASSED; > } > > void __rcu_sync_wait(struct rcu_sync *rsp) > { > BUG_ON(rsp->gp_state == GP_IDLE); > BUG_ON(rsp->gp_count == 0); > > wait_event(rsp->gp_wait, rsp->gp_state >= GP_PASSED); > } > > void rcu_sync_enter(struct rcu_sync *rsp) > { > if (__rcu_sync_enter(rsp)) > __rcu_sync_wait(rsp); > } > > void rcu_sync_exit(struct rcu_sync *rsp) > { > BUG_ON(rsp->gp_state == GP_IDLE); > BUG_ON(rsp->gp_count == 0); > > spin_lock_irq(&rsp->rss_lock); > if (!--rsp->gp_count) { > if (rsp->gp_state == GP_PASSED) { > rsp->gp_state = GP_EXIT; > rcu_sync_call(rsp); > } else if (rsp->gp_state == GP_EXIT) { > rsp->gp_state = GP_REPLAY; > } > } > spin_unlock_irq(&rsp->rss_lock); > } > > void rcu_sync_dtor(struct rcu_sync *rsp) > { > int gp_state; > > BUG_ON(rsp->gp_count); > BUG_ON(rsp->gp_state == GP_PASSED); > > spin_lock_irq(&rsp->rss_lock); > if (rsp->gp_state == GP_REPLAY) > rsp->gp_state = GP_EXIT; > gp_state = rsp->gp_state; > spin_unlock_irq(&rsp->rss_lock); > > // TODO: add another wake_up_locked() into rcu_sync_func(), > // use wait_event + spin_lock_wait, remove gp_ops->wait(). > if (gp_state != GP_IDLE) { > gp_ops[rsp->gp_type].wait(); > BUG_ON(rsp->gp_state != GP_IDLE); > } > } > >