> On Apr 30, 2018, at 21:52, NeilBrown <ne...@suse.com> wrote: > > > > The current retry logic, to wait when a 'dying' object is found, > > spans multiple functions. The process is attached to a waitqueue > > and set TASK_UNINTERRUPTIBLE in htable_lookup, and this status > > is passed back through lu_object_find_try() to lu_object_find_at() > > where schedule() is called and the process is removed from the queue. > > > > This can be simplified by moving all the logic (including > > hashtable locking) inside htable_lookup(), which now never returns > > EAGAIN. > > > > Note that htable_lookup() is called with the hash bucket lock > > held, and will drop and retake it if it needs to schedule. > > > > I made this a 'goto' loop rather than a 'while(1)' loop as the > > diff is easier to read. > > > > Signed-off-by: NeilBrown <ne...@suse.com> > > --- > > drivers/staging/lustre/lustre/obdclass/lu_object.c | 73 > > +++++++------------- > > 1 file changed, 27 insertions(+), 46 deletions(-) > > > > diff --git a/drivers/staging/lustre/lustre/obdclass/lu_object.c > > b/drivers/staging/lustre/lustre/obdclass/lu_object.c > > index 2bf089817157..93daa52e2535 100644 > > --- a/drivers/staging/lustre/lustre/obdclass/lu_object.c > > +++ b/drivers/staging/lustre/lustre/obdclass/lu_object.c > > @@ -586,16 +586,21 @@ EXPORT_SYMBOL(lu_object_print); > > static struct lu_object *htable_lookup(struct lu_site *s, > > It's probably a good idea to add a comment for this function that it may > drop and re-acquire the hash bucket lock internally. > > > struct cfs_hash_bd *bd, > > const struct lu_fid *f, > > - wait_queue_entry_t *waiter, > > __u64 *version) > > { > > + struct cfs_hash *hs = s->ls_obj_hash; > > struct lu_site_bkt_data *bkt; > > struct lu_object_header *h; > > struct hlist_node *hnode; > > - __u64 ver = cfs_hash_bd_version_get(bd); > > + __u64 ver; > > + wait_queue_entry_t waiter; > > > > - if (*version == ver) > > +retry: > > + ver = cfs_hash_bd_version_get(bd); > > + > > + if (*version == ver) { > > return ERR_PTR(-ENOENT); > > + } > > (style) we don't need the {} around a single-line if statement
I hate to be that guy but could you run checkpatch on your patches. > > *version = ver; > > bkt = cfs_hash_bd_extra_get(s->ls_obj_hash, bd); > > @@ -625,11 +630,15 @@ static struct lu_object *htable_lookup(struct lu_site > > *s, > > * drained), and moreover, lookup has to wait until object is freed. > > */ > > > > - init_waitqueue_entry(waiter, current); > > - add_wait_queue(&bkt->lsb_marche_funebre, waiter); > > + init_waitqueue_entry(&waiter, current); > > + add_wait_queue(&bkt->lsb_marche_funebre, &waiter); > > set_current_state(TASK_UNINTERRUPTIBLE); > > lprocfs_counter_incr(s->ls_stats, LU_SS_CACHE_DEATH_RACE); > > - return ERR_PTR(-EAGAIN); > > + cfs_hash_bd_unlock(hs, bd, 1); > > This looks like it isn't unlocking and locking the hash bucket in the same > manner that it was done in the caller. Here excl = 1, but in the caller > you changed it to excl = 0? This is very much like the work done by Lai. The difference is Lai remove the work queue handling complete in htable_lookup(). You can see the details at https://jira.hpdd.intel.com/browse/LU-9049. I will push the missing lu_object fixes including LU-9049 on top of your patch set so you can see the approach Lai did. Form their we can figure out merge the lu_object work and fixing the issues Andreas and I pointed out. > > + schedule(); > > + remove_wait_queue(&bkt->lsb_marche_funebre, &waiter); > > Is it worthwhile to use your new helper function here to get the wq from "s"? > > > + cfs_hash_bd_lock(hs, bd, 1); > > + goto retry; > > } > > > > /** > > @@ -693,13 +702,14 @@ static struct lu_object *lu_object_new(const struct > > lu_env *env, > > } > > > > /** > > - * Core logic of lu_object_find*() functions. > > + * Much like lu_object_find(), but top level device of object is > > specifically > > + * \a dev rather than top level device of the site. This interface allows > > + * objects of different "stacking" to be created within the same site. > > */ > > -static struct lu_object *lu_object_find_try(const struct lu_env *env, > > - struct lu_device *dev, > > - const struct lu_fid *f, > > - const struct lu_object_conf *conf, > > - wait_queue_entry_t *waiter) > > +struct lu_object *lu_object_find_at(const struct lu_env *env, > > + struct lu_device *dev, > > + const struct lu_fid *f, > > + const struct lu_object_conf *conf) > > { > > struct lu_object *o; > > struct lu_object *shadow; > > @@ -725,17 +735,16 @@ static struct lu_object *lu_object_find_try(const > > struct lu_env *env, > > * It is unnecessary to perform lookup-alloc-lookup-insert, instead, > > * just alloc and insert directly. > > * > > - * If dying object is found during index search, add @waiter to the > > - * site wait-queue and return ERR_PTR(-EAGAIN). > > */ > > if (conf && conf->loc_flags & LOC_F_NEW) > > return lu_object_new(env, dev, f, conf); > > > > s = dev->ld_site; > > hs = s->ls_obj_hash; > > - cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 1); > > - o = htable_lookup(s, &bd, f, waiter, &version); > > - cfs_hash_bd_unlock(hs, &bd, 1); > > + cfs_hash_bd_get_and_lock(hs, (void *)f, &bd, 0); > > + o = htable_lookup(s, &bd, f, &version); > > + cfs_hash_bd_unlock(hs, &bd, 0); > > Here you changed the locking to a non-exclusive (read) lock instead of an > exclusive (write) lock? Why. I have the same question. > > > + > > if (!IS_ERR(o) || PTR_ERR(o) != -ENOENT) > > return o; > > > > @@ -751,7 +760,7 @@ static struct lu_object *lu_object_find_try(const > > struct lu_env *env, > > > > cfs_hash_bd_lock(hs, &bd, 1); > > > > - shadow = htable_lookup(s, &bd, f, waiter, &version); > > + shadow = htable_lookup(s, &bd, f, &version); > > if (likely(PTR_ERR(shadow) == -ENOENT)) { > > cfs_hash_bd_add_locked(hs, &bd, &o->lo_header->loh_hash); > > cfs_hash_bd_unlock(hs, &bd, 1); > > @@ -766,34 +775,6 @@ static struct lu_object *lu_object_find_try(const > > struct lu_env *env, > > lu_object_free(env, o); > > return shadow; > > } > > - > > -/** > > - * Much like lu_object_find(), but top level device of object is > > specifically > > - * \a dev rather than top level device of the site. This interface allows > > - * objects of different "stacking" to be created within the same site. > > - */ > > -struct lu_object *lu_object_find_at(const struct lu_env *env, > > - struct lu_device *dev, > > - const struct lu_fid *f, > > - const struct lu_object_conf *conf) > > -{ > > - wait_queue_head_t *wq; > > - struct lu_object *obj; > > - wait_queue_entry_t wait; > > - > > - while (1) { > > - obj = lu_object_find_try(env, dev, f, conf, &wait); > > - if (obj != ERR_PTR(-EAGAIN)) > > - return obj; > > - /* > > - * lu_object_find_try() already added waiter into the > > - * wait queue. > > - */ > > - schedule(); > > - wq = lu_site_wq_from_fid(dev->ld_site, (void *)f); > > - remove_wait_queue(wq, &wait); > > - } > > -} > > EXPORT_SYMBOL(lu_object_find_at); > > > > /** > > > > > > _______________________________________________ > > lustre-devel mailing list > > lustre-de...@lists.lustre.org > > http://lists.lustre.org/listinfo.cgi/lustre-devel-lustre.org > > Cheers, Andreas > -- > Andreas Dilger > Lustre Principal Architect > Intel Corporation > > > > > > > >