The patch documents and updates the memory barriers in ipc/sem.c: - Document that the WRITE_ONCE for q->status relies on a barrier inside wake_q_add().
- Read q->status using READ_ONCE+smp_acquire__after_ctrl_dep(). as the pair for the barrier inside wake_q_add() - Remove READ_ONCE & WRITE_ONCE for the situations where spinlocks provide exclusion. - Add comments to all barriers, and mention the rules in the block regarding locking. Signed-off-by: Manfred Spraul <manf...@colorfullife.com> Cc: Waiman Long <long...@redhat.com> Cc: Davidlohr Bueso <d...@stgolabs.net> --- ipc/sem.c | 64 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/ipc/sem.c b/ipc/sem.c index ec97a7072413..53d970c4e60d 100644 --- a/ipc/sem.c +++ b/ipc/sem.c @@ -205,7 +205,9 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it); * * Memory ordering: * Most ordering is enforced by using spin_lock() and spin_unlock(). - * The special case is use_global_lock: + * + * Exceptions: + * 1) use_global_lock: * Setting it from non-zero to 0 is a RELEASE, this is ensured by * using smp_store_release(). * Testing if it is non-zero is an ACQUIRE, this is ensured by using @@ -214,6 +216,24 @@ static int sysvipc_sem_proc_show(struct seq_file *s, void *it); * this smp_load_acquire(), this is guaranteed because the smp_load_acquire() * is inside a spin_lock() and after a write from 0 to non-zero a * spin_lock()+spin_unlock() is done. + * + * 2) queue.status: + * Initialization is done while holding sem_lock(), so no further barrier is + * required. + * Setting it to a result code is a RELEASE, this is ensured by both the + * barrier inside wake_q_add() (for case a) and while holding sem_lock() + * (for case b). + * The AQUIRE when reading the result code without holding sem_lock() is + * achieved by using READ_ONCE() + smp_acquire__after_ctrl_dep(). + * (case a above). + * Reading the result code while holding sem_lock() needs no further barriers, + * the locks inside sem_lock() enforce ordering (case b above) + * + * 3) current->state: + * current->state is set to TASK_INTERRUPTIBLE while holding sem_lock(). + * The wakeup is handled using the wake_q infrastructure. wake_q wakeups may + * happen immediately after calling wake_q_add. As wake_q_add() is called + * when holding sem_lock(), no further barriers are required. */ #define sc_semmsl sem_ctls[0] @@ -766,13 +786,21 @@ static int perform_atomic_semop(struct sem_array *sma, struct sem_queue *q) static inline void wake_up_sem_queue_prepare(struct sem_queue *q, int error, struct wake_q_head *wake_q) { + /* + * When the wakeup is performed, q->sleeper->state is read and later + * set to TASK_RUNNING. This may happen at any time, even before + * wake_q_add() returns. Memory ordering for q->sleeper->state is + * enforced by sem_lock(): we own sem_lock now (that was the ACQUIRE), + * and q->sleeper wrote q->sleeper->state before calling sem_unlock() + * (->RELEASE). + */ wake_q_add(wake_q, q->sleeper); /* - * Rely on the above implicit barrier, such that we can - * ensure that we hold reference to the task before setting - * q->status. Otherwise we could race with do_exit if the - * task is awoken by an external event before calling - * wake_up_process(). + * Memory barrier pairing: + * case a: The barrier inside wake_q_add() pairs with + * READ_ONCE(q->status) + smp_acquire__after_ctrl_dep() in + * do_semtimedop(). + * case b: nothing, ordering is enforced by the locks in sem_lock(). */ WRITE_ONCE(q->status, error); } @@ -2148,9 +2176,11 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops, } do { - WRITE_ONCE(queue.status, -EINTR); + /* memory ordering ensured by the lock in sem_lock() */ + queue.status = EINTR; queue.sleeper = current; + /* memory ordering is ensured by the lock in sem_lock() */ __set_current_state(TASK_INTERRUPTIBLE); sem_unlock(sma, locknum); rcu_read_unlock(); @@ -2174,12 +2204,16 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops, error = READ_ONCE(queue.status); if (error != -EINTR) { /* - * User space could assume that semop() is a memory - * barrier: Without the mb(), the cpu could - * speculatively read in userspace stale data that was - * overwritten by the previous owner of the semaphore. + * Memory barrier for queue.status, case a): + * The smp_acquire__after_ctrl_dep(), together with the + * READ_ONCE() above pairs with the barrier inside + * wake_q_add(). + * The barrier protects user space, too: User space may + * assume that all data from the CPU that did the wakeup + * semop() is visible on the wakee CPU when the sleeping + * semop() returns. */ - smp_mb(); + smp_acquire__after_ctrl_dep(); goto out_free; } @@ -2189,7 +2223,11 @@ static long do_semtimedop(int semid, struct sembuf __user *tsops, if (!ipc_valid_object(&sma->sem_perm)) goto out_unlock_free; - error = READ_ONCE(queue.status); + /* + * No necessity for any barrier: + * We are protect by sem_lock() (case b) + */ + error = queue.status; /* * If queue.status != -EINTR we are woken up by another process. -- 2.21.0