> -----Original Message-----
> From: Tuong Lien <tuong.t.l...@dektech.com.au>
> Sent: 6-Nov-19 05:39
> To: tipc-discussion@lists.sourceforge.net; Jon Maloy 
> <jon.ma...@ericsson.com>; ma...@donjonn.com;
> ying....@windriver.com
> Subject: [PATCH RFC] tipc: fix name table rbtree issues
> 
> The current rbtree for service ranges in the name table is built based
> on the 'lower' & 'upper' range values resulting in a flaw in the rbtree
> searching. Some issues have been observed in case of range overlapping:
> 
> Case #1: unable to withdraw a name entry:
> After some name services are bound, all of them are withdrawn by user
> but one remains in the name table forever. This corrupts the table and
> that service becomes dummy i.e. no real port.
> E.g.
> 
>                 /
>            {22, 22}
>               /
>              /
>    --->  {10, 50}
>            /  \
>           /    \
>     {10, 30}  {20, 60}
> 
> The node {10, 30} cannot be removed since the rbtree searching stops at
> the node's ancestor i.e. {10, 50}, so starting from it will never reach
> the finding node.
> 
> Case #2: failed to send data in some cases:
> E.g. Two service ranges: {20, 60}, {10, 50} are bound. The rbtree for
> this service will be one of the two cases below depending on the order
> of the bindings:
> 
>         {20, 60}             {10, 50} <--
>           /  \                 /  \
>          /    \               /    \
>     {10, 50}  NIL <--       NIL  {20, 60}
> 
>           (a)                    (b)
> 
> Now, try to send some data to service {30}, there will be two results:
> (a): Failed, no route to host.
> (b): Ok.
> 
> The reason is that the rbtree searching will stop at the pointing node
> as shown above.
> 
> Case #3: no round-robin in data sending:
> Same as case #2b above, the data sending to service {30} will always
> arrive in the {10, 50}.

I wouldn't mention case #3 at all. If we could make this work (which I doubt, 
see my previous mail) it would in reality imply new functionality. -The old 
functionality, round-robin between identical ranges does still work, as far as 
I can understand. 
> 
> Case #4: failed to send data:
> Same as case #2b above but if the data sending's scope is local and the
> {10, 50} is published by a peer node, then it will result in "no route
> to host" even though the other {20, 60} is for example on the local
> node which should be able to get the data.
> 
> The issues are actually due to the way we built the rbtree. This commit
> fixes it by introducing an additional field to each node, named 'max',
> which is the largest 'upper' of that node subtree. The 'max' value for
> each subtrees will be propagated correctly whenever a node is inserted/
> removed or the tree is rebalanced by the augmented rbtree callbacks.
> 
> By this way, we can change the rbtree searching appoarch to solve the
> issues above. Case #3 is however not covered by this commit, we leave
> it as current until one is proven to need a round-robin fashion for it.
> 
> Besides, since now we have the 'max' value, we can even improve the
> searching for a next range matching e.g. in case of multicast, so get
> rid of the unneeded looping over all the nodes in the tree.
> 
> Signed-off-by: Tuong Lien <tuong.t.l...@dektech.com.au>
> ---
>  net/tipc/name_table.c | 268 
> +++++++++++++++++++++++++++++++++-----------------
>  1 file changed, 179 insertions(+), 89 deletions(-)
> 
> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
> index 66a65c2cdb23..5cac6c1dfeb0 100644
> --- a/net/tipc/name_table.c
> +++ b/net/tipc/name_table.c
> @@ -35,6 +35,7 @@
>   */
> 
>  #include <net/sock.h>
> +#include <linux/rbtree_augmented.h>
>  #include "core.h"
>  #include "netlink.h"
>  #include "name_table.h"
> @@ -50,6 +51,7 @@
>   * @lower: service range lower bound
>   * @upper: service range upper bound
>   * @tree_node: member of service range RB tree
> + * @max: largest 'upper' in this node subtree
>   * @local_publ: list of identical publications made from this node
>   *   Used by closest_first lookup and multicast lookup algorithm
>   * @all_publ: all publications identical to this one, whatever node and scope
> @@ -59,6 +61,7 @@ struct service_range {
>       u32 lower;
>       u32 upper;
>       struct rb_node tree_node;
> +     u32 max;
>       struct list_head local_publ;
>       struct list_head all_publ;
>  };
> @@ -81,6 +84,130 @@ struct tipc_service {
>       struct rcu_head rcu;
>  };
> 
> +#define service_range_upper(sr) ((sr)->upper)
> +RB_DECLARE_CALLBACKS_MAX(static, sr_callbacks,
> +                      struct service_range, tree_node, u32, max,
> +                      service_range_upper)
> +
> +#define service_range_entry(rbtree_node)                             \
> +     (container_of(rbtree_node, struct service_range, tree_node))
> +
> +#define service_range_overlap(sr, start, end)                                
> \
> +     ((sr)->lower <= (end) && (sr)->upper >= (start))
> +
> +/**
> + * service_range_foreach_match - iterate over tipc service rbtree for each
> + *                               range match
> + * @sr: the service range pointer as a loop cursor
> + * @sc: the pointer to tipc service which holds the service range rbtree
> + * @start, end: the range (end >= start) for matching
> + */
> +#define service_range_foreach_match(sr, sc, start, end)                      
> \
> +     for (sr = service_range_match_first((sc)->ranges.rb_node,       \
> +                                         (start),                    \
> +                                         (end));                     \
> +          sr;                                                        \
> +          sr = service_range_match_next(&(sr)->tree_node,            \
> +                                        (start),                     \
> +                                        (end)))
> +
> +/**
> + * service_range_match_first - find first service range matching a range
> + * @n: the root node of service range rbtree for searching
> + * @start, end: the range (end >= start) for matching
> + *
> + * Return: the leftmost service range node in the rbtree that overlaps the
> + * specific range if any. Otherwise, returns NULL.
> + */
> +static struct service_range *service_range_match_first(struct rb_node *n,
> +                                                    u32 start, u32 end)
> +{
> +     struct service_range *sr;
> +     struct rb_node *l, *r;
> +
> +     /* Non overlaps in tree at all? */
> +     if (!n || service_range_entry(n)->max < start)
> +             return NULL;
> +
> +     while (n) {
> +             l = n->rb_left;
> +             if (l && service_range_entry(l)->max >= start) {
> +                     /* A leftmost overlap range node must be one in the left
> +                      * subtree. If not, it has lower > end, then nodes on
> +                      * the right side cannot satisfy the condition either.
> +                      */
> +                     n = l;
> +                     continue;
> +             }
> +
> +             /* No one in the left subtree can match, return if this node is
> +              * an overlap i.e. leftmost.
> +              */
> +             sr = service_range_entry(n);
> +             if (service_range_overlap(sr, start, end))
> +                     return sr;
> +
> +             /* Ok, try to lookup on the right side */
> +             r = n->rb_right;
> +             if (sr->lower <= end &&
> +                 r && service_range_entry(r)->max >= start) {
> +                     n = r;
> +                     continue;
> +             }
> +             break;
> +     }
> +
> +     return NULL;
> +}
> +
> +/**
> + * service_range_match_next - find next service range matching a range
> + * @n: a node in service range rbtree from which the searching starts
> + * @start, end: the range (end >= start) for matching
> + *
> + * Return: the next service range node to the given node in the rbtree that
> + * overlaps the specific range if any. Otherwise, returns NULL.
> + */
> +static struct service_range *service_range_match_next(struct rb_node *n,
> +                                                   u32 start, u32 end)
> +{
> +     struct service_range *sr;
> +     struct rb_node *p, *r;
> +
> +     while (n) {
> +             r = n->rb_right;
> +             if (r && service_range_entry(r)->max >= start)
> +                     /* A next overlap range node must be one in the right
> +                      * subtree. If not, it has lower > end, then any next
> +                      * successor (- an ancestor) of this node cannot
> +                      * satisfy the condition either.
> +                      */
> +                     return service_range_match_first(r, start, end);
> +
> +             /* No one in the right subtree can match, go up to find an
> +              * ancestor of this node which is parent of a left-hand child.
> +              */
> +             while ((p = rb_parent(n)) && n == p->rb_right)
> +                     n = p;
> +             if (!p)
> +                     break;
> +
> +             /* Return if this ancestor is an overlap */
> +             sr = service_range_entry(p);
> +             if (service_range_overlap(sr, start, end))
> +                     return sr;
> +
> +             /* Ok, try to lookup more from this ancestor */
> +             if (sr->lower <= end) {
> +                     n = p;
> +                     continue;
> +             }
> +             break;
> +     }
> +
> +     return NULL;
> +}
> +
>  static int hash(int x)
>  {
>       return x & (TIPC_NAMETBL_SIZE - 1);
> @@ -143,19 +270,8 @@ static struct tipc_service *tipc_service_create(u32 
> type, struct hlist_head *hd)
>  static struct service_range *tipc_service_first_range(struct tipc_service 
> *sc,
>                                                     u32 instance)
>  {
> -     struct rb_node *n = sc->ranges.rb_node;
> -     struct service_range *sr;
> -
> -     while (n) {
> -             sr = container_of(n, struct service_range, tree_node);
> -             if (sr->lower > instance)
> -                     n = n->rb_left;
> -             else if (sr->upper < instance)
> -                     n = n->rb_right;
> -             else
> -                     return sr;
> -     }
> -     return NULL;
> +     return service_range_match_first(sc->ranges.rb_node, instance,
> +                                      instance);
>  }

This function looks redundant now. It is called from only one location, and 
could just as well be replaced with a direct call to 
service_range_match_first().

Otherwise this looks good.

Acked-by: jon


> 
>  /*  tipc_service_find_range - find service range matching publication 
> parameters
> @@ -163,56 +279,46 @@ static struct service_range 
> *tipc_service_first_range(struct tipc_service *sc,
>  static struct service_range *tipc_service_find_range(struct tipc_service *sc,
>                                                    u32 lower, u32 upper)
>  {
> -     struct rb_node *n = sc->ranges.rb_node;
>       struct service_range *sr;
> 
> -     sr = tipc_service_first_range(sc, lower);
> -     if (!sr)
> -             return NULL;
> -
> -     /* Look for exact match */
> -     for (n = &sr->tree_node; n; n = rb_next(n)) {
> -             sr = container_of(n, struct service_range, tree_node);
> -             if (sr->upper == upper)
> -                     break;
> +     service_range_foreach_match(sr, sc, lower, upper) {
> +             /* Look for exact match */
> +             if (sr->lower == lower && sr->upper == upper)
> +                     return sr;
>       }
> -     if (!n || sr->lower != lower || sr->upper != upper)
> -             return NULL;
> 
> -     return sr;
> +     return NULL;
>  }
> 
>  static struct service_range *tipc_service_create_range(struct tipc_service 
> *sc,
>                                                      u32 lower, u32 upper)
>  {
>       struct rb_node **n, *parent = NULL;
> -     struct service_range *sr, *tmp;
> +     struct service_range *sr;
> 
>       n = &sc->ranges.rb_node;
>       while (*n) {
> -             tmp = container_of(*n, struct service_range, tree_node);
>               parent = *n;
> -             tmp = container_of(parent, struct service_range, tree_node);
> -             if (lower < tmp->lower)
> -                     n = &(*n)->rb_left;
> -             else if (lower > tmp->lower)
> -                     n = &(*n)->rb_right;
> -             else if (upper < tmp->upper)
> -                     n = &(*n)->rb_left;
> -             else if (upper > tmp->upper)
> -                     n = &(*n)->rb_right;
> +             sr = service_range_entry(parent);
> +             if (lower == sr->lower && upper == sr->upper)
> +                     return sr;
> +             if (sr->max < upper)
> +                     sr->max = upper;
> +             if (lower <= sr->lower)
> +                     n = &parent->rb_left;
>               else
> -                     return tmp;
> +                     n = &parent->rb_right;
>       }
>       sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
>       if (!sr)
>               return NULL;
>       sr->lower = lower;
>       sr->upper = upper;
> +     sr->max = upper;
>       INIT_LIST_HEAD(&sr->local_publ);
>       INIT_LIST_HEAD(&sr->all_publ);
>       rb_link_node(&sr->tree_node, parent, n);
> -     rb_insert_color(&sr->tree_node, &sc->ranges);
> +     rb_insert_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
>       return sr;
>  }
> 
> @@ -289,7 +395,6 @@ static void tipc_service_subscribe(struct tipc_service 
> *service,
>       struct service_range *sr;
>       struct tipc_name_seq ns;
>       struct publication *p;
> -     struct rb_node *n;
>       bool first;
> 
>       ns.type = tipc_sub_read(sb, seq.type);
> @@ -302,14 +407,8 @@ static void tipc_service_subscribe(struct tipc_service 
> *service,
>       if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS)
>               return;
> 
> -     for (n = rb_first(&service->ranges); n; n = rb_next(n)) {
> -             sr = container_of(n, struct service_range, tree_node);
> -             if (sr->lower > ns.upper)
> -                     break;
> -             if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper))
> -                     continue;
> +     service_range_foreach_match(sr, service, ns.lower, ns.upper) {
>               first = true;
> -
>               list_for_each_entry(p, &sr->all_publ, all_publ) {
>                       tipc_sub_report_overlap(sub, sr->lower, sr->upper,
>                                               TIPC_PUBLISHED, p->port,
> @@ -390,7 +489,7 @@ struct publication *tipc_nametbl_remove_publ(struct net 
> *net, u32 type,
> 
>       /* Remove service range item if this was its last publication */
>       if (list_empty(&sr->all_publ)) {
> -             rb_erase(&sr->tree_node, &sc->ranges);
> +             rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
>               kfree(sr);
>       }
> 
> @@ -438,34 +537,39 @@ u32 tipc_nametbl_translate(struct net *net, u32 type, 
> u32 instance, u32
> *dnode)
>       rcu_read_lock();
>       sc = tipc_service_find(net, type);
>       if (unlikely(!sc))
> -             goto not_found;
> +             goto exit;
> 
>       spin_lock_bh(&sc->lock);
> -     sr = tipc_service_first_range(sc, instance);
> -     if (unlikely(!sr))
> -             goto no_match;
> -
> -     /* Select lookup algorithm: local, closest-first or round-robin */
> -     if (*dnode == self) {
> -             list = &sr->local_publ;
> -             if (list_empty(list))
> -                     goto no_match;
> -             p = list_first_entry(list, struct publication, local_publ);
> -             list_move_tail(&p->local_publ, &sr->local_publ);
> -     } else if (legacy && !*dnode && !list_empty(&sr->local_publ)) {
> -             list = &sr->local_publ;
> -             p = list_first_entry(list, struct publication, local_publ);
> -             list_move_tail(&p->local_publ, &sr->local_publ);
> -     } else {
> -             list = &sr->all_publ;
> -             p = list_first_entry(list, struct publication, all_publ);
> -             list_move_tail(&p->all_publ, &sr->all_publ);
> +     service_range_foreach_match(sr, sc, instance, instance) {
> +             /* Select lookup algo: local, closest-first or round-robin */
> +             if (*dnode == self) {
> +                     list = &sr->local_publ;
> +                     if (list_empty(list))
> +                             continue;
> +                     p = list_first_entry(list, struct publication,
> +                                          local_publ);
> +                     list_move_tail(&p->local_publ, &sr->local_publ);
> +             } else if (legacy && !*dnode && !list_empty(&sr->local_publ)) {
> +                     list = &sr->local_publ;
> +                     p = list_first_entry(list, struct publication,
> +                                          local_publ);
> +                     list_move_tail(&p->local_publ, &sr->local_publ);
> +             } else {
> +                     list = &sr->all_publ;
> +                     p = list_first_entry(list, struct publication,
> +                                          all_publ);
> +                     list_move_tail(&p->all_publ, &sr->all_publ);
> +             }
> +             port = p->port;
> +             node = p->node;
> +             /* As for legacy, pick the first matching range only, a "true"
> +              * round-robin will be performed as needed.
> +              */
> +             break;
>       }
> -     port = p->port;
> -     node = p->node;
> -no_match:
>       spin_unlock_bh(&sc->lock);
> -not_found:
> +
> +exit:
>       rcu_read_unlock();
>       *dnode = node;
>       return port;
> @@ -517,7 +621,6 @@ void tipc_nametbl_mc_lookup(struct net *net, u32 type, 
> u32 lower, u32 upper,
>       struct service_range *sr;
>       struct tipc_service *sc;
>       struct publication *p;
> -     struct rb_node *n;
> 
>       rcu_read_lock();
>       sc = tipc_service_find(net, type);
> @@ -525,13 +628,7 @@ void tipc_nametbl_mc_lookup(struct net *net, u32 type, 
> u32 lower, u32 upper,
>               goto exit;
> 
>       spin_lock_bh(&sc->lock);
> -
> -     for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
> -             sr = container_of(n, struct service_range, tree_node);
> -             if (sr->upper < lower)
> -                     continue;
> -             if (sr->lower > upper)
> -                     break;
> +     service_range_foreach_match(sr, sc, lower, upper) {
>               list_for_each_entry(p, &sr->local_publ, local_publ) {
>                       if (p->scope == scope || (!exact && p->scope < scope))
>                               tipc_dest_push(dports, 0, p->port);
> @@ -552,7 +649,6 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 
> type, u32 lower,
>       struct service_range *sr;
>       struct tipc_service *sc;
>       struct publication *p;
> -     struct rb_node *n;
> 
>       rcu_read_lock();
>       sc = tipc_service_find(net, type);
> @@ -560,13 +656,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 
> type, u32 lower,
>               goto exit;
> 
>       spin_lock_bh(&sc->lock);
> -
> -     for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
> -             sr = container_of(n, struct service_range, tree_node);
> -             if (sr->upper < lower)
> -                     continue;
> -             if (sr->lower > upper)
> -                     break;
> +     service_range_foreach_match(sr, sc, lower, upper) {
>               list_for_each_entry(p, &sr->all_publ, all_publ) {
>                       tipc_nlist_add(nodes, p->node);
>               }
> @@ -764,7 +854,7 @@ static void tipc_service_delete(struct net *net, struct 
> tipc_service *sc)
>                       tipc_service_remove_publ(sr, p->node, p->key);
>                       kfree_rcu(p, rcu);
>               }
> -             rb_erase(&sr->tree_node, &sc->ranges);
> +             rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
>               kfree(sr);
>       }
>       hlist_del_init_rcu(&sc->service_list);
> --
> 2.13.7


_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to