On 7.06.19 г. 13:52 ч., Paul E. McKenney wrote:
> On Thu, Jun 06, 2019 at 04:52:18PM +0300, Nikolay Borisov wrote:
>> A (D)ouble (R)eader (W)riter lock is a locking primitive that allows
>> to have multiple readers or multiple writers but not multiple readers
>> and writers holding it concurrently. The code is factored out from
>> the existing open-coded locking scheme used to exclude pending
>> snapshots from nocow writers and vice-versa. Current implementation
>> actually favors Readers (that is snapshot creaters) to writers (nocow
>> writers of the filesystem).
>>
>> Signed-off-by: Nikolay Borisov <[email protected]>
> 
> A preliminary question...
> 
> What prevents the following sequence of events from happening?
> 
> o     btrfs_drw_write_lock() invokes btrfs_drw_try_write_lock(),
>       which sees that lock->readers is zero and thus executes
>       percpu_counter_inc(&lock->writers).
> 
> o     btrfs_drw_read_lock() increments lock->readers, does the
>       smp_mb__after_atomic(), and then does the wait_event().
>       Because btrfs_drw_try_write_lock() incremented its CPU's
>       lock->writers, the sum is the value one, so it blocks.
> 
> o     btrfs_drw_try_write_lock() checks lock->readers, sees that
>       it is now nonzero, and thus invokes btrfs_drw_read_unlock()
>       (which decrements the current CPU's counter, so that a future
>       sum would get zero), and returns false.

btrfs_drw_read_unlock is actually btrfs_drw_write_unlock, my bad, Filipe
already pointed that out and I've fixed it.

The idea here is that if a reader came after we've incremented out
percpu counter then it would have blocked, the writer would see that and
invoke btrfs_drw_write_unlock which will decrement the percpu counter
and will wakeup the reader that is now blocked on pending_readers.

> 
> o     btrfs_drw_write_lock() therefore does its wait_event().
>       Because lock->readers is nonzero, it blocks.
> 
> o     Both tasks are now blocked.  In the absence of future calls
>       to these functions (and perhaps even given such future calls),
>       we have deadlock.
> 
> So what am I missing here?
> 
>                                                       Thanx, Paul
> 
>> ---
>>  fs/btrfs/Makefile   |  2 +-
>>  fs/btrfs/drw_lock.c | 71 +++++++++++++++++++++++++++++++++++++++++++++
>>  fs/btrfs/drw_lock.h | 23 +++++++++++++++
>>  3 files changed, 95 insertions(+), 1 deletion(-)
>>  create mode 100644 fs/btrfs/drw_lock.c
>>  create mode 100644 fs/btrfs/drw_lock.h
>>
>> diff --git a/fs/btrfs/Makefile b/fs/btrfs/Makefile
>> index ca693dd554e9..dc60127791e6 100644
>> --- a/fs/btrfs/Makefile
>> +++ b/fs/btrfs/Makefile
>> @@ -10,7 +10,7 @@ btrfs-y += super.o ctree.o extent-tree.o print-tree.o 
>> root-tree.o dir-item.o \
>>         export.o tree-log.o free-space-cache.o zlib.o lzo.o zstd.o \
>>         compression.o delayed-ref.o relocation.o delayed-inode.o scrub.o \
>>         reada.o backref.o ulist.o qgroup.o send.o dev-replace.o raid56.o \
>> -       uuid-tree.o props.o free-space-tree.o tree-checker.o
>> +       uuid-tree.o props.o free-space-tree.o tree-checker.o drw_lock.o
>>  
>>  btrfs-$(CONFIG_BTRFS_FS_POSIX_ACL) += acl.o
>>  btrfs-$(CONFIG_BTRFS_FS_CHECK_INTEGRITY) += check-integrity.o
>> diff --git a/fs/btrfs/drw_lock.c b/fs/btrfs/drw_lock.c
>> new file mode 100644
>> index 000000000000..9681bf7544be
>> --- /dev/null
>> +++ b/fs/btrfs/drw_lock.c
>> @@ -0,0 +1,71 @@
>> +#include "drw_lock.h"
>> +#include "ctree.h"
>> +
>> +void btrfs_drw_lock_init(struct btrfs_drw_lock *lock)
>> +{
>> +    atomic_set(&lock->readers, 0);
>> +    percpu_counter_init(&lock->writers, 0, GFP_KERNEL);
>> +    init_waitqueue_head(&lock->pending_readers);
>> +    init_waitqueue_head(&lock->pending_writers);
>> +}
>> +
>> +void btrfs_drw_lock_destroy(struct btrfs_drw_lock *lock)
>> +{
>> +    percpu_counter_destroy(&lock->writers);
>> +}
>> +
>> +bool btrfs_drw_try_write_lock(struct btrfs_drw_lock *lock)
>> +{
>> +    if (atomic_read(&lock->readers))
>> +            return false;
>> +
>> +    percpu_counter_inc(&lock->writers);
>> +
>> +    /*
>> +     * Ensure writers count is updated before we check for
>> +     * pending readers
>> +     */
>> +    smp_mb();
>> +    if (atomic_read(&lock->readers)) {
>> +            btrfs_drw_read_unlock(lock);
>> +            return false;
>> +    }
>> +
>> +    return true;
>> +}
>> +
>> +void btrfs_drw_write_lock(struct btrfs_drw_lock *lock)
>> +{
>> +    while(true) {
>> +            if (btrfs_drw_try_write_lock(lock))
>> +                    return;
>> +            wait_event(lock->pending_writers, !atomic_read(&lock->readers));
>> +    }
>> +}
>> +
>> +void btrfs_drw_write_unlock(struct btrfs_drw_lock *lock)
>> +{
>> +    percpu_counter_dec(&lock->writers);
>> +    cond_wake_up(&lock->pending_readers);
>> +}
>> +
>> +void btrfs_drw_read_lock(struct btrfs_drw_lock *lock)
>> +{
>> +    atomic_inc(&lock->readers);
>> +    smp_mb__after_atomic();
>> +
>> +    wait_event(lock->pending_readers,
>> +               percpu_counter_sum(&lock->writers) == 0);
>> +}
>> +
>> +void btrfs_drw_read_unlock(struct btrfs_drw_lock *lock)
>> +{
>> +    /*
>> +     * Atomic RMW operations imply full barrier, so woken up writers
>> +     * are guaranteed to see the decrement
>> +     */
>> +    if (atomic_dec_and_test(&lock->readers))
>> +            wake_up(&lock->pending_writers);
>> +}
>> +
>> +
>> diff --git a/fs/btrfs/drw_lock.h b/fs/btrfs/drw_lock.h
>> new file mode 100644
>> index 000000000000..baff59561c06
>> --- /dev/null
>> +++ b/fs/btrfs/drw_lock.h
>> @@ -0,0 +1,23 @@
>> +#ifndef BTRFS_DRW_LOCK_H
>> +#define BTRFS_DRW_LOCK_H
>> +
>> +#include <linux/atomic.h>
>> +#include <linux/wait.h>
>> +#include <linux/percpu_counter.h>
>> +
>> +struct btrfs_drw_lock {
>> +    atomic_t readers;
>> +    struct percpu_counter writers;
>> +    wait_queue_head_t pending_writers;
>> +    wait_queue_head_t pending_readers;
>> +};
>> +
>> +void btrfs_drw_lock_init(struct btrfs_drw_lock *lock);
>> +void btrfs_drw_lock_destroy(struct btrfs_drw_lock *lock);
>> +void btrfs_drw_write_lock(struct btrfs_drw_lock *lock);
>> +bool btrfs_drw_try_write_lock(struct btrfs_drw_lock *lock);
>> +void btrfs_drw_write_unlock(struct btrfs_drw_lock *lock);
>> +void btrfs_drw_read_lock(struct btrfs_drw_lock *lock);
>> +void btrfs_drw_read_unlock(struct btrfs_drw_lock *lock);
>> +
>> +#endif
>> -- 
>> 2.17.1
>>
> 
> 

Reply via email to