On 08/06, Peter Zijlstra wrote:
>
> On Tue, Aug 06, 2019 at 06:17:42PM +0200, Oleg Nesterov wrote:
>
> > but this will also wake all the pending readers up. Every reader will burn
> > CPU for no reason and likely delay the writer.
> >
> > In fact I'm afraid this can lead to live-lock, because every reader in turn
> > will call __percpu_up_read().
>
> I didn't really consider that case important; because of how heavy the
> write side is, it should be relatively rare.

Well yes, but down_read() should not stress the system.

However I was wrong, it is not that bad as I thought, I forgot that the
pending reader won't return from wait_event(sem->block) if another reader
comes.

Still I think we should try to avoid the unnecessary wakeups. See below.

> > How about 2 wait queues?
>
> That said, I can certainly try that.

and either way, with or without 2 queues, what do you think about the code
below?

This way the new reader does wake_up() only in the very unlikely case when
it races with the new writer which sets sem->block = 1 right after
this_cpu_inc().

Oleg.
-------------------------------------------------------------------------------

static inline void percpu_down_read(struct percpu_rw_semaphore *sem)
{
        might_sleep();
        rwsem_acquire_read(&sem->dep_map, 0, 0, _RET_IP_);

        preempt_disable();

        if (likely(rcu_sync_is_idle(&sem->rss)))
                __this_cpu_inc(*sem->read_count);
        else
                __percpu_down_read(sem, false);

        preempt_enable();
}

static inline void percpu_up_read(struct percpu_rw_semaphore *sem)
{
        rwsem_release(&sem->dep_map, 1, _RET_IP_);

        preempt_disable();

        if (likely(rcu_sync_is_idle(&sem->rss)))
                __this_cpu_dec(*sem->read_count);
        else
                __percpu_up_read(sem);

        preempt_enable();
}

// both called and return with preemption disabled

bool __percpu_down_read(struct percpu_rw_semaphore *sem, bool try)
{

        if (atomic_read_acquire(&sem->block)) {
again:
                preempt_enable();
                __wait_event(sem->waiters, !atomic_read_acquire(&sem->block));
                preempt_disable();
        }

        __this_cpu_inc(*sem->read_count);

        smp_mb();

        if (likely(!atomic_read_acquire(&sem->block)))
                return true;

        __percpu_up_read(sem);

        if (try)
                return false;

        goto again;
}

void __percpu_up_read(struct percpu_rw_semaphore *sem)
{
        smp_mb();

        __this_cpu_dec(*sem->read_count);

        wake_up(&sem->waiters);
}

Reply via email to