On Apr 30, 2018, at 21:52, NeilBrown <[email protected]> wrote:
> 
> The current retry logic, to wait when a 'dying' object is found,
> spans multiple functions.  The process is attached to a waitqueue
> and set TASK_UNINTERRUPTIBLE in htable_lookup, and this status
> is passed back through lu_object_find_try() to lu_object_find_at()
> where schedule() is called and the process is removed from the queue.
> 
> This can be simplified by moving all the logic (including
> hashtable locking) inside htable_lookup(), which now never returns
> EAGAIN.
> 
> Note that htable_lookup() is called with the hash bucket lock
> held, and will drop and retake it if it needs to schedule.
> 
> I made this a 'goto' loop rather than a 'while(1)' loop as the
> diff is easier to read.
> 
> Signed-off-by: NeilBrown <[email protected]>
> ---
> drivers/staging/lustre/lustre/obdclass/lu_object.c |   73 +++++++-------------
> 1 file changed, 27 insertions(+), 46 deletions(-)
> 
> diff --git a/drivers/staging/lustre/lustre/obdclass/lu_object.c 
> b/drivers/staging/lustre/lustre/obdclass/lu_object.c
> index 2bf089817157..93daa52e2535 100644
> --- a/drivers/staging/lustre/lustre/obdclass/lu_object.c
> +++ b/drivers/staging/lustre/lustre/obdclass/lu_object.c
> @@ -586,16 +586,21 @@ EXPORT_SYMBOL(lu_object_print);
> static struct lu_object *htable_lookup(struct lu_site *s,

It's probably a good idea to add a comment for this function that it may
drop and re-acquire the hash bucket lock internally.

>                                      struct cfs_hash_bd *bd,
>                                      const struct lu_fid *f,
> -                                    wait_queue_entry_t *waiter,
>                                      __u64 *version)
> {
> +     struct cfs_hash         *hs = s->ls_obj_hash;
>       struct lu_site_bkt_data *bkt;
>       struct lu_object_header *h;
>       struct hlist_node       *hnode;
> -     __u64  ver = cfs_hash_bd_version_get(bd);
> +     __u64 ver;
> +     wait_queue_entry_t waiter;
> 
> -     if (*version == ver)
> +retry:
> +     ver = cfs_hash_bd_version_get(bd);
> +
> +     if (*version == ver) {
>               return ERR_PTR(-ENOENT);
> +     }

(style) we don't need the {} around a single-line if statement

>       *version = ver;
>       bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd);
> @@ -625,11 +630,15 @@ static struct lu_object *htable_lookup(struct lu_site 
> *s,
>        * drained), and moreover, lookup has to wait until object is freed.
>        */
> 
> -     init_waitqueue_entry(waiter, current);
> -     add_wait_queue(&bkt->lsb_marche_funebre, waiter);
> +     init_waitqueue_entry(&waiter, current);
> +     add_wait_queue(&bkt->lsb_marche_funebre, &waiter);
>       set_current_state(TASK_UNINTERRUPTIBLE);
>       lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE);
> -     return ERR_PTR(-EAGAIN);
> +     cfs_hash_bd_unlock(hs, bd, 1);

This looks like it isn't unlocking and locking the hash bucket in the same
manner that it was done in the caller.  Here excl = 1, but in the caller
you changed it to excl = 0?

> +     schedule();
> +     remove_wait_queue(&bkt->lsb_marche_funebre, &waiter);

Is it worthwhile to use your new helper function here to get the wq from "s"?

> +     cfs_hash_bd_lock(hs, bd, 1);
> +     goto retry;
> }
> 
> /**
> @@ -693,13 +702,14 @@ static struct lu_object *lu_object_new(const struct 
> lu_env *env,
> }
> 
> /**
> - * Core logic of lu_object_find*() functions.
> + * Much like lu_object_find(), but top level device of object is specifically
> + * \a dev rather than top level device of the site. This interface allows
> + * objects of different "stacking" to be created within the same site.
>  */
> -static struct lu_object *lu_object_find_try(const struct lu_env *env,
> -                                         struct lu_device *dev,
> -                                         const struct lu_fid *f,
> -                                         const struct lu_object_conf *conf,
> -                                         wait_queue_entry_t *waiter)
> +struct lu_object *lu_object_find_at(const struct lu_env *env,
> +                                 struct lu_device *dev,
> +                                 const struct lu_fid *f,
> +                                 const struct lu_object_conf *conf)
> {
>       struct lu_object      *o;
>       struct lu_object      *shadow;
> @@ -725,17 +735,16 @@ static struct lu_object *lu_object_find_try(const 
> struct lu_env *env,
>        * It is unnecessary to perform lookup-alloc-lookup-insert, instead,
>        * just alloc and insert directly.
>        *
> -      * If dying object is found during index search, add @waiter to the
> -      * site wait-queue and return ERR_PTR(-EAGAIN).
>        */
>       if (conf && conf->loc_flags & LOC_F_NEW)
>               return lu_object_new(env, dev, f, conf);
> 
>       s  = dev->ld_site;
>       hs = s->ls_obj_hash;
> -     cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1);
> -     o = htable_lookup(s, &bd, f, waiter, &version);
> -     cfs_hash_bd_unlock(hs, &bd, 1);
> +     cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 0);
> +     o = htable_lookup(s, &bd, f, &version);
> +     cfs_hash_bd_unlock(hs, &bd, 0);

Here you changed the locking to a non-exclusive (read) lock instead of an
exclusive (write) lock?  Why.

> +
>       if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT)
>               return o;
> 
> @@ -751,7 +760,7 @@ static struct lu_object *lu_object_find_try(const struct 
> lu_env *env,
> 
>       cfs_hash_bd_lock(hs, &bd, 1);
> 
> -     shadow = htable_lookup(s, &bd, f, waiter, &version);
> +     shadow = htable_lookup(s, &bd, f, &version);
>       if (likely(PTR_ERR(shadow) == -ENOENT)) {
>               cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash);
>               cfs_hash_bd_unlock(hs, &bd, 1);
> @@ -766,34 +775,6 @@ static struct lu_object *lu_object_find_try(const struct 
> lu_env *env,
>       lu_object_free(env, o);
>       return shadow;
> }
> -
> -/**
> - * Much like lu_object_find(), but top level device of object is specifically
> - * \a dev rather than top level device of the site. This interface allows
> - * objects of different "stacking" to be created within the same site.
> - */
> -struct lu_object *lu_object_find_at(const struct lu_env *env,
> -                                 struct lu_device *dev,
> -                                 const struct lu_fid *f,
> -                                 const struct lu_object_conf *conf)
> -{
> -     wait_queue_head_t       *wq;
> -     struct lu_object        *obj;
> -     wait_queue_entry_t         wait;
> -
> -     while (1) {
> -             obj = lu_object_find_try(env, dev, f, conf, &wait);
> -             if (obj != ERR_PTR(-EAGAIN))
> -                     return obj;
> -             /*
> -              * lu_object_find_try() already added waiter into the
> -              * wait queue.
> -              */
> -             schedule();
> -             wq = lu_site_wq_from_fid(dev->ld_site, (void *)f);
> -             remove_wait_queue(wq, &wait);
> -     }
> -}
> EXPORT_SYMBOL(lu_object_find_at);
> 
> /**
> 
> 
> _______________________________________________
> lustre-devel mailing list
> [email protected]
> http://lists.lustre.org/listinfo.cgi/lustre-devel-lustre.org

Cheers, Andreas
--
Andreas Dilger
Lustre Principal Architect
Intel Corporation







Reply via email to