> Date: Sat, 12 Mar 2022 10:42:18 +0000 > From: Taylor R Campbell <riastr...@netbsd.org> > > - The one-word difference is immaterial for ordering atomic-r/m/w and > then load/store (or, equivalently, ll/sc and then load/store) -- so > the change doesn't affect mutex_enter-type operations implemented > with, e.g., atomic_cas.
It turns out this argument I made is wrong! Although not for any of the reasons discussed up-thread. The _conclusion_ is correct that the proposed change (w/rw to r/rw) doesn't affect mutex_enter-type operations with atomic_cas -- doing atomic-r/m/w then membar-r/rw makes a load-acquire operation which is sufficient for locking (provided unlocking does store-release as usual) to serialize the critical sections. So r/rw suffices for the Solaris language of roughly `lock acquisition before load/store', whether `lock acquisition' is an atomic test-and-set, compare-and-swap, ll/sc, or even crazier schemes like Dekker's algorithm -- noting that Dekker's algorithm requires an _additional_ store-before-load barrier _inside_ `lock acquisition'. And r/rw is often much cheaper than w/rw. But the _premise_ I stated above is wrong, because atomic-r/m/w then membar-r/rw is _sometimes_ different from atomic-r/m/w then membar-w/rw. This can be exhibited through Dekker's algorithm, which (for two CPUs) is: int waiting[2]; int turn; lock() { top: waiting[curcpu()] = 1; membar(w/r); while (waiting[1 - curcpu()]) { if (turn != curcpu()) { waiting[curcpu()] = 0; while (turn != curcpu()) continue; goto top; } } membar(r/rw); /* `lock acquisition then load/store' */ } unlock() { membar(rw/w); /* `load/store then lock release' */ turn = 1 - curcpu(); waiting[curcpu()] = 0; } For the first two lines of lock(), we can obviously use an atomic_swap instead of a simple store, and the stronger membar(w/rw) instead of membar(w/r): lock() { top: (void)atomic_swap(&waiting[curcpu()], 1); membar(w/rw); ... This is an atomic-r/m/w then membar(w/rw). Dekker's algorithm notoriously _requires_ store-before-load ordering here. So replacing the membar(w/rw) by membar(r/rw) can't be correct, even though it immediately follows an atomic-r/m/w. The difference is exhibited by the attached program when run on an rpi4 -- it should print 40000000 if lock/unlock are correct, but it sometimes prints a slightly lower count because Dekker's algorithm fails to achieve mutual exclusion with DMB ISHLD (that is, r/rw) instead of DMB ISH (that is, rw/rw).
#include <sys/atomic.h> #include <assert.h> #include <err.h> #include <pthread.h> #include <stdio.h> #include <unistd.h> #undef BROKEN #define BROKEN 1 #if defined(__amd64__) #define membar_acquire() asm volatile("" ::: "memory") #define membar_release() asm volatile("" ::: "memory") #ifdef BROKEN /* not really broken because atomic_swap implies seq_cst */ #define membar_dekker() asm volatile("" ::: "memory") #else #define membar_dekker() asm volatile("mfence" ::: "memory") #endif #define noop() asm volatile("pause" ::: "memory") #elif defined(__aarch64__) #define membar_acquire() asm volatile("dmb ishld" ::: "memory") #define membar_release() asm volatile("dmb ish" ::: "memory") #ifdef BROKEN #define membar_dekker() asm volatile("dmb ishld" ::: "memory") #else #define membar_dekker() asm volatile("dmb ish" ::: "memory") #endif #define noop() asm volatile("yield" ::: "memory") #endif volatile struct { unsigned v; uint8_t pad[128 - sizeof(unsigned)]; } waiting[2] __aligned((128)); volatile unsigned turn; volatile unsigned counter; static void lock(unsigned me) { top: atomic_swap_uint(&waiting[me].v, 1); membar_dekker(); while (waiting[1 - me].v) { if (turn != me) { waiting[me].v = 0; while (turn != me) continue; goto top; } } membar_acquire(); } static void unlock(unsigned me) { membar_release(); turn = 1 - me; waiting[me].v = 0; } static void * thread(void *cookie) { unsigned me = (uintptr_t)cookie; unsigned i; for (i = 10000000; i --> 0;) { lock(me); counter++; noop(); counter++; unlock(me); } return NULL; } int main(void) { pthread_t t[2]; unsigned i; int error; for (i = 0; i < 2; i++) { error = pthread_create(&t[i], NULL, &thread, (void *)(uintptr_t)i); if (error) errc(1, error, "pthread_create"); } for (i = 0; i < 2; i++) { error = pthread_join(t[i], NULL); if (error) errc(1, error, "pthread_join"); } printf("%u\n", counter); fflush(stdout); return ferror(stdout); }