On Fri, 04/15 13:32, Paolo Bonzini wrote: > +/* The wait records are handled with a multiple-producer, single-consumer > + * lock-free queue. There cannot be two concurrent pop_waiter() calls > + * because pop_waiter() can only come when mutex->handoff is zero. This can > + * happen in three cases: > + * - in qemu_co_mutex_unlock, before the hand-off protocol has started. > + * In this case, qemu_co_mutex_lock will see mutex->handoff == 0 and > + * not take part in the handoff. > + * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from > + * qemu_co_mutex_unlock. In this case, qemu_co_mutex_unlock will fail > + * the cmpxchg (it will see either 0 or the next sequence value) and > + * exit. The next hand-off cannot begin until qemu_co_mutex_lock has > + * woken up someone. > + * - in qemu_co_mutex_unlock, if it takes the hand-off token itself. > + * In this case another iterations starts with mutex->handoff == 0;
s/iterations/iteration/ > + * a concurrent qemu_co_mutex_lock will fail the cmpxchg, and > + * qemu_co_mutex_unlock will go back to case (1). > + * > + * The following functions manage this queue. > + */ > +typedef struct CoWaitRecord { > + Coroutine *co; > + QSLIST_ENTRY(CoWaitRecord) next; > +} CoWaitRecord; > + <snip> > @@ -132,12 +223,50 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex) > > trace_qemu_co_mutex_unlock_entry(mutex, self); > > - assert(mutex->locked == true); > + assert(mutex->locked); > assert(qemu_in_coroutine()); > > - mutex->locked = false; > - qemu_co_queue_next(&mutex->queue); > + if (atomic_fetch_dec(&mutex->locked) == 1) { > + /* No waiting qemu_co_mutex_lock(). Pfew, that was easy! */ > + return; > + } > + > + for (;;) { > + CoWaitRecord *to_wake = pop_waiter(mutex); > + unsigned our_handoff; > + > + if (to_wake) { > + Coroutine *co = to_wake->co; > + qemu_coroutine_wake(co->ctx, co); > + goto out; > + } > + > + /* Some concurrent lock() is in progress (we know this because of > + * count) but it hasn't yet put itself on the wait queue. Unlike OSv's lfmutex.cc, we don't seem to have count. Should the comment say "locked" instead? > + * Pick a sequence number for the handoff protocol (not 0). > + */ > + if (++mutex->sequence == 0) { > + mutex->sequence = 1; > + } > + > + our_handoff = mutex->sequence; > + atomic_mb_set(&mutex->handoff, our_handoff); > + if (!has_waiters(mutex)) { > + /* The concurrent lock has not added itself yet, so it > + * will be able to pick our handoff. > + */ > + goto out; > + } > + > + /* Try to do the handoff protocol ourselves; if somebody else has > + * already taken it, however, we're done and they're responsible. > + */ > + if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) != our_handoff) { > + goto out; > + } > + } > > +out: > trace_qemu_co_mutex_unlock_return(mutex, self); > } > > -- > 2.5.5 > >