d_alloc_parallel() currently requires a wait_queue_head to be passed in.
This must have a life time which extends until the lookup is completed.

Future proposed patches will use d_alloc_parallel() for names being
created/unlinked etc.  Some filesystems combine lookup with create
making a longer code path that the wq needs to live for.  If it is still
to be allocated on-stack this can be cumbersome.

This patch replaces the on-stack wqs with a global array of wqs which
are used as needed.  A wq is NOT allocated when a dentry is first
created but only when a second thread attempts to use the same name and
so is forced to wait.  At this moment a wq is chosen using a hash of the
dentry pointer and that wq is assigned to ->d_wait.  The ->d_lock is
then dropped and the task waits.

When the dentry is finally moved out of "in_lookup" a wake up is only
sent if ->d_wait is not NULL.  This avoids an (uncontended) spin
lock/unlock which saves a couple of atomic operations in a common case.

The wake up passes the dentry that the wake up is for as the "key" and
the waiter will only wake processes waiting on the same key.  This means
that when these global waitqueues are shared (which is inevitable
though unlikely to be frequent), a task will not be woken prematurely.

Signed-off-by: NeilBrown <n...@brown.name>
---
 Documentation/filesystems/porting.rst |  6 +++
 fs/afs/dir_silly.c                    |  4 +-
 fs/dcache.c                           | 77 ++++++++++++++++++++++-----
 fs/fuse/readdir.c                     |  3 +-
 fs/namei.c                            |  6 +--
 fs/nfs/dir.c                          |  6 +--
 fs/nfs/unlink.c                       |  3 +-
 fs/proc/base.c                        |  3 +-
 fs/proc/proc_sysctl.c                 |  3 +-
 fs/smb/client/readdir.c               |  3 +-
 include/linux/dcache.h                |  3 +-
 include/linux/nfs_xdr.h               |  1 -
 12 files changed, 80 insertions(+), 38 deletions(-)

diff --git a/Documentation/filesystems/porting.rst 
b/Documentation/filesystems/porting.rst
index 85f590254f07..e4a326e8fa4c 100644
--- a/Documentation/filesystems/porting.rst
+++ b/Documentation/filesystems/porting.rst
@@ -1285,3 +1285,9 @@ rather than a VMA, as the VMA at this stage is not yet 
valid.
 The vm_area_desc provides the minimum required information for a filesystem
 to initialise state upon memory mapping of a file-backed region, and output
 parameters for the file system to set this state.
+---
+
+** mandatory**
+
+d_alloc_parallel() no longer requires a waitqueue_head.  It uses one
+from an internal table when needed.
diff --git a/fs/afs/dir_silly.c b/fs/afs/dir_silly.c
index 0b80eb93fa40..ce76b3b30850 100644
--- a/fs/afs/dir_silly.c
+++ b/fs/afs/dir_silly.c
@@ -237,13 +237,11 @@ int afs_silly_iput(struct dentry *dentry, struct inode 
*inode)
        struct dentry *alias;
        int ret;
 
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
-
        _enter("%p{%pd},%llx", dentry, dentry, vnode->fid.vnode);
 
        down_read(&dvnode->rmdir_lock);
 
-       alias = d_alloc_parallel(dentry->d_parent, &dentry->d_name, &wq);
+       alias = d_alloc_parallel(dentry->d_parent, &dentry->d_name);
        if (IS_ERR(alias)) {
                up_read(&dvnode->rmdir_lock);
                return 0;
diff --git a/fs/dcache.c b/fs/dcache.c
index 0db256098adb..5473d906783e 100644
--- a/fs/dcache.c
+++ b/fs/dcache.c
@@ -2137,8 +2137,7 @@ struct dentry *d_add_ci(struct dentry *dentry, struct 
inode *inode,
                return found;
        }
        if (d_in_lookup(dentry)) {
-               found = d_alloc_parallel(dentry->d_parent, name,
-                                       dentry->d_wait);
+               found = d_alloc_parallel(dentry->d_parent, name);
                if (IS_ERR(found) || !d_in_lookup(found)) {
                        iput(inode);
                        return found;
@@ -2148,7 +2147,7 @@ struct dentry *d_add_ci(struct dentry *dentry, struct 
inode *inode,
                if (!found) {
                        iput(inode);
                        return ERR_PTR(-ENOMEM);
-               } 
+               }
        }
        res = d_splice_alias(inode, found);
        if (res) {
@@ -2505,6 +2504,46 @@ void d_rehash(struct dentry * entry)
 }
 EXPORT_SYMBOL(d_rehash);
 
+#define        PAR_LOOKUP_WQ_BITS      8
+#define PAR_LOOKUP_WQS (1 << PAR_LOOKUP_WQ_BITS)
+static wait_queue_head_t par_wait_table[PAR_LOOKUP_WQS] __cacheline_aligned;
+
+static int __init par_wait_init(void)
+{
+       int i;
+
+       for (i = 0; i < PAR_LOOKUP_WQS; i++)
+               init_waitqueue_head(&par_wait_table[i]);
+       return 0;
+}
+fs_initcall(par_wait_init);
+
+struct par_wait_key {
+       struct dentry *de;
+       struct wait_queue_entry wqe;
+};
+
+static int d_wait_wake_fn(struct wait_queue_entry *wq_entry,
+                         unsigned mode, int sync, void *key)
+{
+       struct par_wait_key *pwk = container_of(wq_entry,
+                                                struct par_wait_key, wqe);
+       if (pwk->de == key)
+               return default_wake_function(wq_entry, mode, sync, key);
+       return 0;
+}
+
+static inline void d_wake_waiters(struct wait_queue_head *d_wait,
+                                 struct dentry *dentry)
+{
+       /* ->d_wait is only set if some thread is actually waiting.
+        * If we find it is NULL - the common case - then there was no
+        * contention and there are no waiters to be woken.
+        */
+       if (d_wait)
+               __wake_up(d_wait, TASK_NORMAL, 0, dentry);
+}
+
 static inline unsigned start_dir_add(struct inode *dir)
 {
        preempt_disable_nested();
@@ -2517,31 +2556,41 @@ static inline unsigned start_dir_add(struct inode *dir)
 }
 
 static inline void end_dir_add(struct inode *dir, unsigned int n,
-                              wait_queue_head_t *d_wait)
+                              wait_queue_head_t *d_wait, struct dentry *de)
 {
        smp_store_release(&dir->i_dir_seq, n + 2);
        preempt_enable_nested();
-       if (wq_has_sleeper(d_wait))
-               wake_up_all(d_wait);
+       d_wake_waiters(d_wait, de);
 }
 
 static void d_wait_lookup(struct dentry *dentry)
 {
        if (d_in_lookup(dentry)) {
-               DECLARE_WAITQUEUE(wait, current);
-               add_wait_queue(dentry->d_wait, &wait);
+               struct par_wait_key wk = {
+                       .de = dentry,
+                       .wqe = {
+                               .private = current,
+                               .func = d_wait_wake_fn,
+                       },
+               };
+               struct wait_queue_head *wq;
+               if (!dentry->d_wait)
+                       dentry->d_wait = &par_wait_table[hash_ptr(dentry,
+                                                                 
PAR_LOOKUP_WQ_BITS)];
+               wq = dentry->d_wait;
+               add_wait_queue(wq, &wk.wqe);
                do {
                        set_current_state(TASK_UNINTERRUPTIBLE);
                        spin_unlock(&dentry->d_lock);
                        schedule();
                        spin_lock(&dentry->d_lock);
                } while (d_in_lookup(dentry));
+               remove_wait_queue(wq, &wk.wqe);
        }
 }
 
 struct dentry *d_alloc_parallel(struct dentry *parent,
-                               const struct qstr *name,
-                               wait_queue_head_t *wq)
+                               const struct qstr *name)
 {
        unsigned int hash = name->hash;
        struct hlist_bl_head *b = in_lookup_hash(parent, hash);
@@ -2554,6 +2603,7 @@ struct dentry *d_alloc_parallel(struct dentry *parent,
                return ERR_PTR(-ENOMEM);
 
        new->d_flags |= DCACHE_PAR_LOOKUP;
+       new->d_wait = NULL;
        spin_lock(&parent->d_lock);
        new->d_parent = dget_dlock(parent);
        hlist_add_head(&new->d_sib, &parent->d_children);
@@ -2642,7 +2692,6 @@ struct dentry *d_alloc_parallel(struct dentry *parent,
                return dentry;
        }
        rcu_read_unlock();
-       new->d_wait = wq;
        hlist_bl_add_head(&new->d_u.d_in_lookup_hash, b);
        hlist_bl_unlock(b);
        return new;
@@ -2680,7 +2729,7 @@ static wait_queue_head_t *__d_lookup_unhash(struct dentry 
*dentry)
 void __d_lookup_unhash_wake(struct dentry *dentry)
 {
        spin_lock(&dentry->d_lock);
-       wake_up_all(__d_lookup_unhash(dentry));
+       d_wake_waiters(__d_lookup_unhash(dentry), dentry);
        spin_unlock(&dentry->d_lock);
 }
 EXPORT_SYMBOL(__d_lookup_unhash_wake);
@@ -2715,7 +2764,7 @@ static inline void __d_add(struct dentry *dentry, struct 
inode *inode,
                           (DCACHE_LRU_LIST|DCACHE_SHRINK_LIST)) == 
DCACHE_LRU_LIST)
                this_cpu_dec(nr_dentry_negative);
        if (dir)
-               end_dir_add(dir, n, d_wait);
+               end_dir_add(dir, n, d_wait, dentry);
        spin_unlock(&dentry->d_lock);
        if (inode)
                spin_unlock(&inode->i_lock);
@@ -2881,7 +2930,7 @@ static void __d_move(struct dentry *dentry, struct dentry 
*target,
        write_seqcount_end(&dentry->d_seq);
 
        if (dir)
-               end_dir_add(dir, n, d_wait);
+               end_dir_add(dir, n, d_wait, target);
 
        if (dentry->d_parent != old_parent)
                spin_unlock(&dentry->d_parent->d_lock);
diff --git a/fs/fuse/readdir.c b/fs/fuse/readdir.c
index c2aae2eef086..f588252891af 100644
--- a/fs/fuse/readdir.c
+++ b/fs/fuse/readdir.c
@@ -160,7 +160,6 @@ static int fuse_direntplus_link(struct file *file,
        struct inode *dir = d_inode(parent);
        struct fuse_conn *fc;
        struct inode *inode;
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
        int epoch;
 
        if (!o->nodeid) {
@@ -197,7 +196,7 @@ static int fuse_direntplus_link(struct file *file,
        dentry = d_lookup(parent, &name);
        if (!dentry) {
 retry:
-               dentry = d_alloc_parallel(parent, &name, &wq);
+               dentry = d_alloc_parallel(parent, &name);
                if (IS_ERR(dentry))
                        return PTR_ERR(dentry);
        }
diff --git a/fs/namei.c b/fs/namei.c
index fb075573157a..2c98672fdb6a 100644
--- a/fs/namei.c
+++ b/fs/namei.c
@@ -2012,13 +2012,12 @@ static struct dentry *__lookup_slow(const struct qstr 
*name,
 {
        struct dentry *dentry, *old;
        struct inode *inode = dir->d_inode;
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
 
        /* Don't go there if it's already dead */
        if (unlikely(IS_DEADDIR(inode)))
                return ERR_PTR(-ENOENT);
 again:
-       dentry = d_alloc_parallel(dir, name, &wq);
+       dentry = d_alloc_parallel(dir, name);
        if (IS_ERR(dentry))
                return dentry;
        if (unlikely(!d_in_lookup(dentry))) {
@@ -4028,7 +4027,6 @@ static struct dentry *lookup_open(struct nameidata *nd, 
struct file *file,
        struct dentry *dentry;
        int error, create_error = 0;
        umode_t mode = op->mode;
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
 
        if (unlikely(IS_DEADDIR(dir_inode)))
                return ERR_PTR(-ENOENT);
@@ -4037,7 +4035,7 @@ static struct dentry *lookup_open(struct nameidata *nd, 
struct file *file,
        dentry = d_lookup(dir, &nd->last);
        for (;;) {
                if (!dentry) {
-                       dentry = d_alloc_parallel(dir, &nd->last, &wq);
+                       dentry = d_alloc_parallel(dir, &nd->last);
                        if (IS_ERR(dentry))
                                return dentry;
                }
diff --git a/fs/nfs/dir.c b/fs/nfs/dir.c
index 250a826d5480..bbeb24805a0e 100644
--- a/fs/nfs/dir.c
+++ b/fs/nfs/dir.c
@@ -727,7 +727,6 @@ void nfs_prime_dcache(struct dentry *parent, struct 
nfs_entry *entry,
                unsigned long dir_verifier)
 {
        struct qstr filename = QSTR_INIT(entry->name, entry->len);
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
        struct dentry *dentry;
        struct dentry *alias;
        struct inode *inode;
@@ -756,7 +755,7 @@ void nfs_prime_dcache(struct dentry *parent, struct 
nfs_entry *entry,
        dentry = d_lookup(parent, &filename);
 again:
        if (!dentry) {
-               dentry = d_alloc_parallel(parent, &filename, &wq);
+               dentry = d_alloc_parallel(parent, &filename);
                if (IS_ERR(dentry))
                        return;
        }
@@ -2060,7 +2059,6 @@ int nfs_atomic_open(struct inode *dir, struct dentry 
*dentry,
                    struct file *file, unsigned open_flags,
                    umode_t mode)
 {
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
        struct nfs_open_context *ctx;
        struct dentry *res;
        struct iattr attr = { .ia_valid = ATTR_OPEN };
@@ -2116,7 +2114,7 @@ int nfs_atomic_open(struct inode *dir, struct dentry 
*dentry,
                d_drop(dentry);
                switched = true;
                dentry = d_alloc_parallel(dentry->d_parent,
-                                         &dentry->d_name, &wq);
+                                         &dentry->d_name);
                if (IS_ERR(dentry))
                        return PTR_ERR(dentry);
                if (unlikely(!d_in_lookup(dentry)))
diff --git a/fs/nfs/unlink.c b/fs/nfs/unlink.c
index b55467911648..894af85830fa 100644
--- a/fs/nfs/unlink.c
+++ b/fs/nfs/unlink.c
@@ -124,7 +124,7 @@ static int nfs_call_unlink(struct dentry *dentry, struct 
inode *inode, struct nf
        struct dentry *alias;
 
        down_read_non_owner(&NFS_I(dir)->rmdir_sem);
-       alias = d_alloc_parallel(dentry->d_parent, &data->args.name, &data->wq);
+       alias = d_alloc_parallel(dentry->d_parent, &data->args.name);
        if (IS_ERR(alias)) {
                up_read_non_owner(&NFS_I(dir)->rmdir_sem);
                return 0;
@@ -185,7 +185,6 @@ nfs_async_unlink(struct dentry *dentry, const struct qstr 
*name)
 
        data->cred = get_current_cred();
        data->res.dir_attr = &data->dir_attr;
-       init_waitqueue_head(&data->wq);
 
        status = -EBUSY;
        spin_lock(&dentry->d_lock);
diff --git a/fs/proc/base.c b/fs/proc/base.c
index 62d35631ba8c..0b296c94000e 100644
--- a/fs/proc/base.c
+++ b/fs/proc/base.c
@@ -2129,8 +2129,7 @@ bool proc_fill_cache(struct file *file, struct 
dir_context *ctx,
 
        child = try_lookup_noperm(&qname, dir);
        if (!child) {
-               DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
-               child = d_alloc_parallel(dir, &qname, &wq);
+               child = d_alloc_parallel(dir, &qname);
                if (IS_ERR(child))
                        goto end_instantiate;
                if (d_in_lookup(child)) {
diff --git a/fs/proc/proc_sysctl.c b/fs/proc/proc_sysctl.c
index 49ab74e0bfde..04a382178c65 100644
--- a/fs/proc/proc_sysctl.c
+++ b/fs/proc/proc_sysctl.c
@@ -692,8 +692,7 @@ static bool proc_sys_fill_cache(struct file *file,
 
        child = d_lookup(dir, &qname);
        if (!child) {
-               DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
-               child = d_alloc_parallel(dir, &qname, &wq);
+               child = d_alloc_parallel(dir, &qname);
                if (IS_ERR(child))
                        return false;
                if (d_in_lookup(child)) {
diff --git a/fs/smb/client/readdir.c b/fs/smb/client/readdir.c
index 4e5460206397..5a92a1ad317d 100644
--- a/fs/smb/client/readdir.c
+++ b/fs/smb/client/readdir.c
@@ -74,7 +74,6 @@ cifs_prime_dcache(struct dentry *parent, struct qstr *name,
        struct cifs_sb_info *cifs_sb = CIFS_SB(sb);
        bool posix = cifs_sb_master_tcon(cifs_sb)->posix_extensions;
        bool reparse_need_reval = false;
-       DECLARE_WAIT_QUEUE_HEAD_ONSTACK(wq);
        int rc;
 
        cifs_dbg(FYI, "%s: for %s\n", __func__, name->name);
@@ -106,7 +105,7 @@ cifs_prime_dcache(struct dentry *parent, struct qstr *name,
                    (fattr->cf_flags & CIFS_FATTR_NEED_REVAL))
                        return;
 
-               dentry = d_alloc_parallel(parent, name, &wq);
+               dentry = d_alloc_parallel(parent, name);
        }
        if (IS_ERR(dentry))
                return;
diff --git a/include/linux/dcache.h b/include/linux/dcache.h
index 5d53489e5556..996259d1bc88 100644
--- a/include/linux/dcache.h
+++ b/include/linux/dcache.h
@@ -241,8 +241,7 @@ extern void d_delete(struct dentry *);
 /* allocate/de-allocate */
 extern struct dentry * d_alloc(struct dentry *, const struct qstr *);
 extern struct dentry * d_alloc_anon(struct super_block *);
-extern struct dentry * d_alloc_parallel(struct dentry *, const struct qstr *,
-                                       wait_queue_head_t *);
+extern struct dentry * d_alloc_parallel(struct dentry *, const struct qstr *);
 extern struct dentry * d_splice_alias(struct inode *, struct dentry *);
 /* weird procfs mess; *NOT* exported */
 extern struct dentry * d_splice_alias_ops(struct inode *, struct dentry *,
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index ac4bff6e9913..197c9b30dfdf 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1735,7 +1735,6 @@ struct nfs_unlinkdata {
        struct nfs_removeargs args;
        struct nfs_removeres res;
        struct dentry *dentry;
-       wait_queue_head_t wq;
        const struct cred *cred;
        struct nfs_fattr dir_attr;
        long timeout;
-- 
2.50.0.107.gf914562f5916.dirty


Reply via email to