The current design of the binding table has an unnecessary memory
consuming and complex data structure. It aggregates the service
range items into an array, which is expanded by a factor two every
time it becomes too small to hold a new range item. Furthermore,
the arrays never shrink when the number of ranges diminishes.

We now replace this array with an RB tree that is holding the range
items as tree nodes, each range directly holding a circular list
of bindings.

This improves both volume and readability of the code, as well as
reducing memory consumption and hopefully improving cache hit rate.

Signed-off-by: Jon Maloy <[email protected]>
---
 net/tipc/binding_table.c | 759 +++++++++++++++++++++--------------------------
 net/tipc/link.c          |   2 +-
 net/tipc/subscr.h        |   2 +-
 3 files changed, 337 insertions(+), 426 deletions(-)

diff --git a/net/tipc/binding_table.c b/net/tipc/binding_table.c
index 7bb9ffc..521a581 100644
--- a/net/tipc/binding_table.c
+++ b/net/tipc/binding_table.c
@@ -47,25 +47,21 @@
 #include <net/genetlink.h>
 
 /**
- * struct name_info - name range publication info
- * @node_list: list of bindings on own node of this <type,lower,upper>
- * @all_bindings: list of all bindings of this <type,lower,upper>
- */
-struct name_info {
-       struct list_head local_bindings;
-       struct list_head all_bindings;
-};
-
-/**
  * struct service_range - container for all bindings of a service
  * @lower: name range lower bound
  * @upper: name range upper bound
- * @info: pointer to name range publication info
+ * @tree_node: member of service range RB tree
+ * @local_bindings: list of identical bindings made from this node
+ *   Used by closest_first lookup and multicast lookup algorithm
+ * @all_bindings: all bindings identical to this one, whatever node and scope
+ *   Used by round-robin lookup algorithm
  */
 struct service_range {
        u32 lower;
        u32 upper;
-       struct name_info *info;
+       struct rb_node tree_node;
+       struct list_head local_bindings;
+       struct list_head all_bindings;
 };
 
 /**
@@ -77,14 +73,12 @@ struct service_range {
  * @first_free: array index of first unused range entry
  * @service_list: links to adjacent name ranges in hash chain
  * @subscriptions: list of subscriptions for this service type
- * @lock: spinlock controlling access to publication lists of all ranges
+ * @lock: spinlock controlling access to pertaining service ranges and bindings
  * @rcu: RCU callback head used for deferred freeing
  */
 struct tipc_service {
        u32 type;
-       struct service_range *ranges;
-       u32 alloc;
-       u32 first_free;
+       struct rb_root ranges;
        struct hlist_node service_list;
        struct list_head subscriptions;
        spinlock_t lock; /* Covers service range list */
@@ -116,42 +110,32 @@ static struct tipc_binding *tipc_create_binding(u32 type, 
u32 lower, u32 upper,
        b->port = port;
        b->key = key;
        INIT_LIST_HEAD(&b->binding_sock);
+       INIT_LIST_HEAD(&b->binding_node);
+       INIT_LIST_HEAD(&b->local_bindings);
+       INIT_LIST_HEAD(&b->all_bindings);
        return b;
 }
 
 /**
- * tipc_srvrange_alloc - allocate a specified number of service range 
structures
- */
-static struct service_range *tipc_srvrange_alloc(u32 cnt)
-{
-       return kcalloc(cnt, sizeof(struct service_range), GFP_ATOMIC);
-}
-
-/**
  * tipc_service_create - create a service structure for the specified 'type'
  *
  * Allocates a single range structure and sets it to all 0's.
  */
-static struct tipc_service *tipc_service_create(u32 type,
-                                               struct hlist_head *service_head)
+static struct tipc_service *tipc_service_create(u32 type, struct hlist_head 
*hd)
 {
        struct tipc_service *service = kzalloc(sizeof(*service), GFP_ATOMIC);
-       struct service_range *sr = tipc_srvrange_alloc(1);
 
-       if (!service || !sr) {
-               pr_warn("Name range creation failed, no memory\n");
-               kfree(service);
-               kfree(sr);
+       if (!service) {
+               pr_warn("Service creation failed, no memory\n");
                return NULL;
        }
 
        spin_lock_init(&service->lock);
        service->type = type;
-       service->ranges = sr;
-       service->alloc = 1;
+       service->ranges = RB_ROOT;
        INIT_HLIST_NODE(&service->service_list);
        INIT_LIST_HEAD(&service->subscriptions);
-       hlist_add_head_rcu(&service->service_list, service_head);
+       hlist_add_head_rcu(&service->service_list, hd);
        return service;
 }
 
@@ -163,54 +147,51 @@ static struct tipc_service *tipc_service_create(u32 type,
 static struct service_range *tipc_service_find_range(struct tipc_service *sc,
                                                     u32 instance)
 {
-       struct service_range *ranges = sc->ranges;
-       int low = 0;
-       int high = sc->first_free - 1;
-       int mid;
-
-       while (low <= high) {
-               mid = (low + high) / 2;
-               if (instance < ranges[mid].lower)
-                       high = mid - 1;
-               else if (instance > ranges[mid].upper)
-                       low = mid + 1;
+       struct rb_node *n = sc->ranges.rb_node;
+       struct service_range *sr;
+
+       while (n) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->lower > instance)
+                       n = n->rb_left;
+               else if (sr->upper < instance)
+                       n = n->rb_right;
                else
-                       return &ranges[mid];
+                       return sr;
        }
        return NULL;
 }
 
-/**
- * tipc_service_locate_range - determine position of name instance in range
- *
- * Returns index in range array of the entry that contains the specified
- * instance value; if no entry contains that value, returns the position
- * where a new entry for it would be inserted in the array.
- *
- * Note: Similar to binary search code for locating a range.
- */
-static u32 tipc_service_locate_range(struct tipc_service *service, u32 
instance)
+static struct service_range *tipc_service_create_range(struct tipc_service *sc,
+                                                      u32 lower, u32 upper)
 {
-       struct service_range *ranges = service->ranges;
-       int low = 0;
-       int high = service->first_free - 1;
-       int mid;
-
-       while (low <= high) {
-               mid = (low + high) / 2;
-               if (instance < ranges[mid].lower)
-                       high = mid - 1;
-               else if (instance > ranges[mid].upper)
-                       low = mid + 1;
+       struct rb_node **n, *parent = NULL;
+       struct service_range *sr, *tmp;
+
+       n = &sc->ranges.rb_node;
+       while (*n) {
+               tmp = container_of(*n, struct service_range, tree_node);
+               parent = *n;
+               tmp = container_of(parent, struct service_range, tree_node);
+               if (lower < tmp->lower)
+                       n = &(*n)->rb_left;
+               else if (upper > tmp->upper)
+                       n = &(*n)->rb_right;
                else
-                       return mid;
+                       return NULL;
        }
-       return low;
+       sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
+       if (!sr)
+               return NULL;
+       sr->lower = lower;
+       sr->upper = upper;
+       INIT_LIST_HEAD(&sr->local_bindings);
+       INIT_LIST_HEAD(&sr->all_bindings);
+       rb_link_node(&sr->tree_node, parent, n);
+       rb_insert_color(&sr->tree_node, &sc->ranges);
+       return sr;
 }
 
-/**
- * tipc_service_insert_binding
- */
 static struct tipc_binding *tipc_service_insert_binding(struct net *net,
                                                        struct tipc_service *sc,
                                                        u32 type, u32 lower,
@@ -218,93 +199,46 @@ static struct tipc_binding 
*tipc_service_insert_binding(struct net *net,
                                                        u32 node, u32 port,
                                                        u32 key)
 {
-       struct tipc_subscription *s;
-       struct tipc_subscription *st;
-       struct tipc_binding *b;
+       struct tipc_subscription *sub, *tmp;
        struct service_range *sr;
-       struct name_info *info;
-       int created_range = 0;
+       struct tipc_binding *b;
+       bool first = false;
 
        sr = tipc_service_find_range(sc, lower);
-       if (sr) {
-               /* Lower end overlaps existing entry => need an exact match */
-               if (sr->lower != lower || sr->upper != upper)
-                       return NULL;
-
-               info = sr->info;
-
-               /* Check if an identical binding already exists */
-               list_for_each_entry(b, &info->all_bindings, all_bindings) {
-                       if (b->port == port && b->key == key &&
-                           (!b->node || b->node == node))
-                               return NULL;
-               }
-       } else {
-               u32 inspos;
-               struct service_range *freesr;
-
-               /* Find where lower end should be inserted */
-               inspos = tipc_service_locate_range(sc, lower);
-
-               /* Fail if upper end overlaps into an existing entry */
-               if (inspos < sc->first_free &&
-                   upper >= sc->ranges[inspos].lower)
-                       return NULL;
-
-               /* Ensure there is space for new range */
-               if (sc->first_free == sc->alloc) {
-                       struct service_range *ranges =
-                               tipc_srvrange_alloc(sc->alloc * 2);
+       if (!sr) {
+               sr = tipc_service_create_range(sc, lower, upper);
+               if (!sr)
+                       goto  err;
+               first = true;
+       }
 
-                       if (!ranges) {
-                               pr_warn("Cannot publish {%u,%u,%u}, no 
memory\n",
-                                       type, lower, upper);
-                               return NULL;
-                       }
-                       memcpy(ranges, sc->ranges,
-                              sc->alloc * sizeof(*sr));
-                       kfree(sc->ranges);
-                       sc->ranges = ranges;
-                       sc->alloc *= 2;
-               }
+       /* Lower end overlaps existing entry, but we need an exact match */
+       if (sr->lower != lower || sr->upper != upper)
+               return NULL;
 
-               info = kzalloc(sizeof(*info), GFP_ATOMIC);
-               if (!info)
+       /* Return if the binding already exists */
+       list_for_each_entry(b, &sr->all_bindings, all_bindings) {
+               if (b->key == key && (!b->node || b->node == node))
                        return NULL;
-
-               INIT_LIST_HEAD(&info->local_bindings);
-               INIT_LIST_HEAD(&info->all_bindings);
-
-               /* Insert new range */
-               sr = &sc->ranges[inspos];
-               freesr = &sc->ranges[sc->first_free];
-               memmove(sr + 1, sr, (freesr - sr) * sizeof(*sr));
-               memset(sr, 0, sizeof(*sr));
-               sc->first_free++;
-               sr->lower = lower;
-               sr->upper = upper;
-               sr->info = info;
-               created_range = 1;
        }
 
-       /* Insert a binding */
+       /* Create and insert binding */
        b = tipc_create_binding(type, lower, upper, scope, node, port, key);
        if (!b)
-               return NULL;
-
-       list_add(&b->all_bindings, &info->all_bindings);
-
+               goto err;
        if (in_own_node(net, node))
-               list_add(&b->local_bindings, &info->local_bindings);
+               list_add(&b->local_bindings, &sr->local_bindings);
+       list_add(&b->all_bindings, &sr->all_bindings);
 
        /* Any subscriptions waiting for notification?  */
-       list_for_each_entry_safe(s, st, &sc->subscriptions, service_list) {
-               tipc_sub_report_overlap(s, b->lower, b->upper,
-                                       TIPC_PUBLISHED, b->port,
-                                       b->node, b->scope,
-                                       created_range);
+       list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
+               tipc_sub_report_overlap(sub, b->lower, b->upper, TIPC_PUBLISHED,
+                                       b->port, b->node, b->scope, first);
        }
        return b;
+err:
+       pr_warn("Failed to bind to %u,%u,%u, no memory\n", type, lower, upper);
+       return NULL;
 }
 
 /**
@@ -323,100 +257,93 @@ static struct tipc_binding 
*tipc_service_remove_binding(struct net *net,
                                                        u32 inst, u32 node,
                                                        u32 port, u32 key)
 {
-       struct service_range *sr = tipc_service_find_range(sc, inst);
-       struct name_info *info;
-       struct service_range *free;
-       struct tipc_subscription *s, *st;
+       struct tipc_subscription *sub, *tmp;
+       struct service_range *sr;
        struct tipc_binding *b;
-       int removed_range = 0;
+       bool found = false;
+       bool last = false;
 
+       sr = tipc_service_find_range(sc, inst);
        if (!sr)
                return NULL;
 
-       info = sr->info;
-
-       /* Locate binding, if it exists */
-       list_for_each_entry(b, &info->all_bindings, all_bindings) {
-               if (b->key == key && b->port == port &&
-                   (!b->node || b->node == node))
-                       goto found;
+       /* Find binding, if it exists */
+       list_for_each_entry(b, &sr->all_bindings, all_bindings) {
+               if (b->key != key || (node && node != b->node))
+                       continue;
+               found = true;
+               break;
        }
-       return NULL;
-found:
+       if (!found)
+               return NULL;
+
        list_del(&b->all_bindings);
-       if (in_own_node(net, node))
-               list_del(&b->local_bindings);
-
-       /* Contract range list if no more bindings for that range */
-       if (list_empty(&info->all_bindings)) {
-               kfree(info);
-               free = &sc->ranges[sc->first_free--];
-               memmove(sr, sr + 1, (free - (sr + 1)) * sizeof(*sr));
-               removed_range = 1;
+       list_del(&b->local_bindings);
+
+       /* Remove service range item if this was last binding for this range */
+       if (list_empty(&sr->all_bindings)) {
+               last = true;
+               rb_erase(&sr->tree_node, &sc->ranges);
+               kfree(sr);
        }
+
        /* Notify any waiting subscriptions */
-       list_for_each_entry_safe(s, st, &sc->subscriptions, service_list) {
-               tipc_sub_report_overlap(s, b->lower, b->upper,
-                                       TIPC_WITHDRAWN, b->port,
-                                       b->node, b->scope,
-                                       removed_range);
+       list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
+               tipc_sub_report_overlap(sub, b->lower, b->upper, TIPC_WITHDRAWN,
+                                       b->port, b->node, b->scope, last);
        }
        return b;
 }
 
 /**
  * tipc_service_subscribe - attach a subscription, and optionally
- * issue the prescribed number of events if there is any sub-
+ * issue the prescribed number of events if there is any service
  * range overlapping with the requested range
  */
 static void tipc_service_subscribe(struct tipc_service *service,
                                   struct tipc_subscription *sub)
 {
-       struct service_range *sr = service->ranges;
+       struct tipc_subscr *sb = &sub->evt.s;
+       struct service_range *sr;
        struct tipc_name_seq ns;
-       struct tipc_subscr *s = &sub->evt.s;
-       bool no_status;
+       struct tipc_binding *b;
+       struct rb_node *n;
+       bool first;
 
-       ns.type = tipc_sub_read(s, seq.type);
-       ns.lower = tipc_sub_read(s, seq.lower);
-       ns.upper = tipc_sub_read(s, seq.upper);
-       no_status = tipc_sub_read(s, filter) & TIPC_SUB_NO_STATUS;
+       ns.type = tipc_sub_read(sb, seq.type);
+       ns.lower = tipc_sub_read(sb, seq.lower);
+       ns.upper = tipc_sub_read(sb, seq.upper);
 
        tipc_sub_get(sub);
        list_add(&sub->service_list, &service->subscriptions);
 
-       if (no_status || !sr)
+       if (tipc_sub_read(sb, filter) & TIPC_SUB_NO_STATUS)
                return;
 
-       while (sr != &service->ranges[service->first_free]) {
-               if (tipc_sub_check_overlap(&ns, sr->lower, sr->upper)) {
-                       struct tipc_binding *crs;
-                       struct name_info *info = sr->info;
-                       int must_report = 1;
-
-                       list_for_each_entry(crs, &info->all_bindings,
-                                           all_bindings) {
-                               tipc_sub_report_overlap(sub, sr->lower,
-                                                       sr->upper,
-                                                       TIPC_PUBLISHED,
-                                                       crs->port,
-                                                       crs->node,
-                                                       crs->scope,
-                                                       must_report);
-                               must_report = 0;
-                       }
+       for (n = rb_first(&service->ranges); n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->lower > ns.upper)
+                       break;
+               if (!tipc_sub_check_overlap(&ns, sr->lower, sr->upper))
+                       continue;
+               first = true;
+
+               list_for_each_entry(b, &sr->all_bindings, all_bindings) {
+                       tipc_sub_report_overlap(sub, sr->lower, sr->upper,
+                                               TIPC_PUBLISHED, b->port,
+                                               b->node, b->scope, first);
+                       first = false;
                }
-               sr++;
        }
 }
 
 static struct tipc_service *tipc_bindtbl_find_service(struct net *net, u32 
type)
 {
-       struct tipc_net *tn = tipc_net(net);
+       struct tipc_binding_table *bt = tipc_binding_table(net);
        struct hlist_head *service_head;
        struct tipc_service *service;
 
-       service_head = &tn->bindingtbl->services[hash(type)];
+       service_head = &bt->services[hash(type)];
        hlist_for_each_entry_rcu(service, service_head, service_list) {
                if (service->type == type)
                        return service;
@@ -429,26 +356,25 @@ struct tipc_binding *tipc_bindtbl_insert_binding(struct 
net *net, u32 type,
                                                 u32 scope, u32 node,
                                                 u32 port, u32 key)
 {
-       struct tipc_net *tn = tipc_net(net);
+       struct tipc_binding_table *bt = tipc_binding_table(net);
+       struct tipc_service *sc;
        struct tipc_binding *b;
-       struct tipc_service *service = tipc_bindtbl_find_service(net, type);
-       int index = hash(type);
 
        if (scope > TIPC_NODE_SCOPE || lower > upper) {
-               pr_debug("Failed to publish illegal {%u,%u,%u} with scope %u\n",
+               pr_debug("Failed to bind illegal {%u,%u,%u} with scope %u\n",
                         type, lower, upper, scope);
                return NULL;
        }
-       if (!service)
-               service = tipc_service_create(type,
-                                             &tn->bindingtbl->services[index]);
-       if (!service)
+       sc = tipc_bindtbl_find_service(net, type);
+       if (!sc)
+               sc = tipc_service_create(type, &bt->services[hash(type)]);
+       if (!sc)
                return NULL;
 
-       spin_lock_bh(&service->lock);
-       b = tipc_service_insert_binding(net, service, type, lower, upper,
+       spin_lock_bh(&sc->lock);
+       b = tipc_service_insert_binding(net, sc, type, lower, upper,
                                        scope, node, port, key);
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
        return b;
 }
 
@@ -456,27 +382,26 @@ struct tipc_binding *tipc_bindtbl_remove_binding(struct 
net *net, u32 type,
                                                 u32 lower, u32 node, u32 port,
                                                 u32 key)
 {
-       struct tipc_binding *b;
-       struct tipc_service *service = tipc_bindtbl_find_service(net, type);
+       struct tipc_service *sc = tipc_bindtbl_find_service(net, type);
+       struct tipc_binding *b = NULL;
 
-       if (!service)
+       if (!sc)
                return NULL;
 
-       spin_lock_bh(&service->lock);
-       b = tipc_service_remove_binding(net, service, lower, node, port, key);
-       if (!service->first_free && list_empty(&service->subscriptions)) {
-               hlist_del_init_rcu(&service->service_list);
-               kfree(service->ranges);
-               spin_unlock_bh(&service->lock);
-               kfree_rcu(service, rcu);
-               return b;
+       spin_lock_bh(&sc->lock);
+       b = tipc_service_remove_binding(net, sc, lower, node, port, key);
+
+       /* Delete service item if this no more bindings and subscriptions */
+       if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
+               hlist_del_init_rcu(&sc->service_list);
+               kfree_rcu(sc, rcu);
        }
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
        return b;
 }
 
 /**
- * tipc_bindtbl_translate - perform name translation
+ * tipc_bindtbl_translate - perform service instance to socket translation
  *
  * On entry, 'destnode' is the search domain used during translation.
  *
@@ -484,7 +409,7 @@ struct tipc_binding *tipc_bindtbl_remove_binding(struct net 
*net, u32 type,
  * - if name translation is deferred to another node/cluster/zone,
  *   leaves 'destnode' unchanged (will be non-zero) and returns 0
  * - if name translation is attempted and succeeds, sets 'destnode'
- *   to publishing node and returns port reference (will be non-zero)
+ *   to binding node and returns port reference (will be non-zero)
  * - if name translation is attempted and fails, sets 'destnode' to 0
  *   and returns 0
  */
@@ -495,9 +420,8 @@ u32 tipc_bindtbl_translate(struct net *net, u32 type, u32 
instance,
        bool legacy = tn->legacy_addr_format;
        u32 self = tipc_own_addr(net);
        struct service_range *sr;
-       struct name_info *info;
        struct tipc_binding *b;
-       struct tipc_service *service;
+       struct tipc_service *sc;
        u32 port = 0;
        u32 node = 0;
 
@@ -505,49 +429,49 @@ u32 tipc_bindtbl_translate(struct net *net, u32 type, u32 
instance,
                return 0;
 
        rcu_read_lock();
-       service = tipc_bindtbl_find_service(net, type);
-       if (unlikely(!service))
+       sc = tipc_bindtbl_find_service(net, type);
+       if (unlikely(!sc))
                goto not_found;
-       spin_lock_bh(&service->lock);
-       sr = tipc_service_find_range(service, instance);
+
+       spin_lock_bh(&sc->lock);
+       sr = tipc_service_find_range(sc, instance);
        if (unlikely(!sr))
                goto no_match;
-       info = sr->info;
 
        /* Closest-First Algorithm */
        if (legacy && !*destnode) {
-               if (!list_empty(&info->local_bindings)) {
-                       b = list_first_entry(&info->local_bindings,
+               if (!list_empty(&sr->local_bindings)) {
+                       b = list_first_entry(&sr->local_bindings,
                                             struct tipc_binding,
                                             local_bindings);
                        list_move_tail(&b->local_bindings,
-                                      &info->local_bindings);
+                                      &sr->local_bindings);
                } else {
-                       b = list_first_entry(&info->all_bindings,
+                       b = list_first_entry(&sr->all_bindings,
                                             struct tipc_binding,
                                             all_bindings);
                        list_move_tail(&b->all_bindings,
-                                      &info->all_bindings);
+                                      &sr->all_bindings);
                }
        }
 
        /* Round-Robin Algorithm */
-       else if (*destnode == tipc_own_addr(net)) {
-               if (list_empty(&info->local_bindings))
+       else if (*destnode == self) {
+               if (list_empty(&sr->local_bindings))
                        goto no_match;
-               b = list_first_entry(&info->local_bindings, struct tipc_binding,
+               b = list_first_entry(&sr->local_bindings, struct tipc_binding,
                                     local_bindings);
-               list_move_tail(&b->local_bindings, &info->local_bindings);
+               list_move_tail(&b->local_bindings, &sr->local_bindings);
        } else {
-               b = list_first_entry(&info->all_bindings, struct tipc_binding,
+               b = list_first_entry(&sr->all_bindings, struct tipc_binding,
                                     all_bindings);
-               list_move_tail(&b->all_bindings, &info->all_bindings);
+               list_move_tail(&b->all_bindings, &sr->all_bindings);
        }
 
        port = b->port;
        node = b->node;
 no_match:
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
 not_found:
        rcu_read_unlock();
        *destnode = node;
@@ -559,34 +483,36 @@ bool tipc_bindtbl_lookup(struct net *net, u32 type, u32 
instance, u32 scope,
                         bool all)
 {
        u32 self = tipc_own_addr(net);
-       struct tipc_binding *b;
-       struct name_info *info;
-       struct tipc_service *service;
        struct service_range *sr;
+       struct tipc_service *sc;
+       struct tipc_binding *b;
 
        *dstcnt = 0;
        rcu_read_lock();
-       service = tipc_bindtbl_find_service(net, type);
-       if (unlikely(!service))
+       sc = tipc_bindtbl_find_service(net, type);
+       if (unlikely(!sc))
                goto exit;
-       spin_lock_bh(&service->lock);
-       sr = tipc_service_find_range(service, instance);
-       if (likely(sr)) {
-               info = sr->info;
-               list_for_each_entry(b, &info->all_bindings, all_bindings) {
-                       if (b->scope != scope)
-                               continue;
-                       if (b->port == exclude && b->node == self)
-                               continue;
-                       tipc_dest_push(dsts, b->node, b->port);
-                       (*dstcnt)++;
-                       if (all)
-                               continue;
-                       list_move_tail(&b->all_bindings, &info->all_bindings);
-                       break;
-               }
+
+       spin_lock_bh(&sc->lock);
+
+       sr = tipc_service_find_range(sc, instance);
+       if (!sr)
+               goto no_match;
+
+       list_for_each_entry(b, &sr->all_bindings, all_bindings) {
+               if (b->scope != scope)
+                       continue;
+               if (b->port == exclude && b->node == self)
+                       continue;
+               tipc_dest_push(dsts, b->node, b->port);
+               (*dstcnt)++;
+               if (all)
+                       continue;
+               list_move_tail(&b->all_bindings, &sr->all_bindings);
+               break;
        }
-       spin_unlock_bh(&service->lock);
+no_match:
+       spin_unlock_bh(&sc->lock);
 exit:
        rcu_read_unlock();
        return !list_empty(dsts);
@@ -595,61 +521,64 @@ bool tipc_bindtbl_lookup(struct net *net, u32 type, u32 
instance, u32 scope,
 void tipc_bindtbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
                            u32 scope, bool exact, struct list_head *dports)
 {
-       struct service_range *sr_stop;
-       struct name_info *info;
-       struct tipc_binding *p;
-       struct tipc_service *service;
        struct service_range *sr;
+       struct tipc_service *sc;
+       struct tipc_binding *b;
+       struct rb_node *n;
 
        rcu_read_lock();
-       service = tipc_bindtbl_find_service(net, type);
-       if (!service)
+       sc = tipc_bindtbl_find_service(net, type);
+       if (!sc)
                goto exit;
 
-       spin_lock_bh(&service->lock);
-       sr = service->ranges + tipc_service_locate_range(service, lower);
-       sr_stop = service->ranges + service->first_free;
-       for (; sr != sr_stop; sr++) {
+       spin_lock_bh(&sc->lock);
+
+       for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->upper < lower)
+                       continue;
                if (sr->lower > upper)
                        break;
-               info = sr->info;
-               list_for_each_entry(p, &info->local_bindings, local_bindings) {
-                       if (p->scope == scope || (!exact && p->scope < scope))
-                               tipc_dest_push(dports, 0, p->port);
+               list_for_each_entry(b, &sr->local_bindings, local_bindings) {
+                       if (b->scope == scope || (!exact && b->scope < scope))
+                               tipc_dest_push(dports, 0, b->port);
                }
        }
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
 exit:
        rcu_read_unlock();
 }
 
 /* tipc_bindtbl_lookup_dst_nodes - find broadcast destination nodes
  * - Creates list of nodes that overlap the given multicast address
- * - Determines if any node local ports overlap
+ * - Determines if any node local destinations overlap
  */
 void tipc_bindtbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
                                   u32 upper, struct tipc_nlist *nodes)
 {
-       struct service_range *sr, *stop;
+       struct service_range *sr;
+       struct tipc_service *sc;
        struct tipc_binding *b;
-       struct name_info *info;
-       struct tipc_service *service;
+       struct rb_node *n;
 
        rcu_read_lock();
-       service = tipc_bindtbl_find_service(net, type);
-       if (!service)
+       sc = tipc_bindtbl_find_service(net, type);
+       if (!sc)
                goto exit;
 
-       spin_lock_bh(&service->lock);
-       sr = service->ranges + tipc_service_locate_range(service, lower);
-       stop = service->ranges + service->first_free;
-       for (; sr != stop && sr->lower <= upper; sr++) {
-               info = sr->info;
-               list_for_each_entry(b, &info->all_bindings, all_bindings) {
+       spin_lock_bh(&sc->lock);
+
+       for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->upper < lower)
+                       continue;
+               if (sr->lower > upper)
+                       break;
+               list_for_each_entry(b, &sr->all_bindings, all_bindings) {
                        tipc_nlist_add(nodes, b->node);
                }
        }
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
 exit:
        rcu_read_unlock();
 }
@@ -659,87 +588,86 @@ void tipc_bindtbl_lookup_dst_nodes(struct net *net, u32 
type, u32 lower,
 void tipc_bindtbl_build_group(struct net *net, struct tipc_group *grp,
                              u32 type, u32 scope)
 {
-       struct service_range *sr, *stop;
-       struct name_info *info;
+       struct service_range *sr;
+       struct tipc_service *sc;
        struct tipc_binding *b;
-       struct tipc_service *service;
+       struct rb_node *n;
 
        rcu_read_lock();
-       service = tipc_bindtbl_find_service(net, type);
-       if (!service)
+       sc = tipc_bindtbl_find_service(net, type);
+       if (!sc)
                goto exit;
 
-       spin_lock_bh(&service->lock);
-       sr = service->ranges;
-       stop = service->ranges + service->first_free;
-       for (; sr != stop; sr++) {
-               info = sr->info;
-               list_for_each_entry(b, &info->all_bindings, all_bindings) {
+       spin_lock_bh(&sc->lock);
+       for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               list_for_each_entry(b, &sr->local_bindings, local_bindings) {
                        if (b->scope != scope)
                                continue;
                        tipc_group_add_member(grp, b->node, b->port, b->lower);
                }
        }
-       spin_unlock_bh(&service->lock);
+       spin_unlock_bh(&sc->lock);
 exit:
        rcu_read_unlock();
 }
 
-/* tipc_bindtbl_publish - add name binding to network name tables
+/* tipc_bindtbl_bind - add service binding to binding table
  */
 struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 type, u32 lower,
-                                      u32 upper, u32 scope, u32 port,
-                                      u32 key)
+                                      u32 upper, u32 scope, u32 port, u32 key)
 {
-       struct tipc_binding *b;
-       struct sk_buff *buf = NULL;
+       struct tipc_binding_table *bt = tipc_binding_table(net);
        struct tipc_net *tn = tipc_net(net);
+       struct sk_buff *skb = NULL;
+       struct tipc_binding *b;
 
        spin_lock_bh(&tn->bindingtbl_lock);
-       if (tn->bindingtbl->local_bindings_count >= TIPC_MAX_PUBLICATIONS) {
-               pr_warn("Binding failed, local limit reached (%u)\n",
-                       TIPC_MAX_PUBLICATIONS);
-               spin_unlock_bh(&tn->bindingtbl_lock);
-               return NULL;
+
+       if (bt->local_bindings_count >= TIPC_MAX_BINDS) {
+               pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_BINDS);
+               goto exit;
        }
 
        b = tipc_bindtbl_insert_binding(net, type, lower, upper, scope,
                                        tipc_own_addr(net), port, key);
-       if (likely(b)) {
-               tn->bindingtbl->local_bindings_count++;
-               buf = tipc_bindist_publish(net, b);
+       if (b) {
+               bt->local_bindings_count++;
+               skb = tipc_bindist_publish(net, b);
                /* Any pending external events? */
                tipc_bindist_process_backlog(net);
        }
+exit:
        spin_unlock_bh(&tn->bindingtbl_lock);
 
-       if (buf)
-               tipc_node_broadcast(net, buf);
+       if (skb)
+               tipc_node_broadcast(net, skb);
        return b;
 }
 
 /**
- * tipc_bindtbl_unbind - withdraw name binding from network name tables
+ * tipc_bindtbl_unbind - withdraw a service binding
  */
 int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 port, u32 
key)
 {
-       struct tipc_binding *b;
-       struct sk_buff *skb = NULL;
+       struct tipc_binding_table *bt = tipc_binding_table(net);
        struct tipc_net *tn = tipc_net(net);
+       u32 self = tipc_own_addr(net);
+       struct sk_buff *skb = NULL;
+       struct tipc_binding *b;
 
        spin_lock_bh(&tn->bindingtbl_lock);
-       b = tipc_bindtbl_remove_binding(net, type, lower, tipc_own_addr(net),
-                                       port, key);
-       if (likely(b)) {
-               tn->bindingtbl->local_bindings_count--;
+
+       b = tipc_bindtbl_remove_binding(net, type, lower, self, port, key);
+       if (b) {
+               bt->local_bindings_count--;
                skb = tipc_bindist_withdraw(net, b);
                /* Any pending external events? */
                tipc_bindist_process_backlog(net);
                list_del_init(&b->binding_sock);
                kfree_rcu(b, rcu);
        } else {
-               pr_err("Unable to remove local binding\n"
-                      "(type=%u, lower=%u, port=%u, key=%u)\n",
+               pr_err("Failed to remove local binding {%u,%u,%u}/%u\n",
                       type, lower, port, key);
        }
        spin_unlock_bh(&tn->bindingtbl_lock);
@@ -756,29 +684,24 @@ int tipc_bindtbl_unbind(struct net *net, u32 type, u32 
lower, u32 port, u32 key)
  */
 void tipc_bindtbl_subscribe(struct tipc_subscription *sub)
 {
+       struct tipc_binding_table *bt = tipc_binding_table(sub->net);
        struct tipc_net *tn = tipc_net(sub->net);
        struct tipc_subscr *s = &sub->evt.s;
        u32 type = tipc_sub_read(s, seq.type);
-       int index = hash(type);
-       struct tipc_service *service;
-       struct tipc_name_seq ns;
+       struct tipc_service *sc;
 
        spin_lock_bh(&tn->bindingtbl_lock);
-       service = tipc_bindtbl_find_service(sub->net, type);
-       if (!service)
-               service = tipc_service_create(type,
-                                             &tn->bindingtbl->services[index]);
-
-       if (service) {
-               spin_lock_bh(&service->lock);
-               tipc_service_subscribe(service, sub);
-               spin_unlock_bh(&service->lock);
+       sc = tipc_bindtbl_find_service(sub->net, type);
+       if (!sc)
+               sc = tipc_service_create(type, &bt->services[hash(type)]);
+       if (sc) {
+               spin_lock_bh(&sc->lock);
+               tipc_service_subscribe(sc, sub);
+               spin_unlock_bh(&sc->lock);
        } else {
-               ns.type = tipc_sub_read(s, seq.type);
-               ns.lower = tipc_sub_read(s, seq.lower);
-               ns.upper = tipc_sub_read(s, seq.upper);
-               pr_warn("Failed to create subscription for {%u,%u,%u}\n",
-                       ns.type, ns.lower, ns.upper);
+               pr_warn("Failed to subscribe for {%u,%u,%u}\n", type,
+                       tipc_sub_read(s, seq.lower),
+                       tipc_sub_read(s, seq.upper));
        }
        spin_unlock_bh(&tn->bindingtbl_lock);
 }
@@ -788,27 +711,27 @@ void tipc_bindtbl_subscribe(struct tipc_subscription *sub)
  */
 void tipc_bindtbl_unsubscribe(struct tipc_subscription *sub)
 {
-       struct tipc_subscr *s = &sub->evt.s;
        struct tipc_net *tn = tipc_net(sub->net);
-       struct tipc_service *service;
+       struct tipc_subscr *s = &sub->evt.s;
        u32 type = tipc_sub_read(s, seq.type);
+       struct tipc_service *sc;
 
        spin_lock_bh(&tn->bindingtbl_lock);
-       service = tipc_bindtbl_find_service(sub->net, type);
-       if (service) {
-               spin_lock_bh(&service->lock);
-               list_del_init(&sub->service_list);
-               tipc_sub_put(sub);
-               if (!service->first_free &&
-                   list_empty(&service->subscriptions)) {
-                       hlist_del_init_rcu(&service->service_list);
-                       kfree(service->ranges);
-                       spin_unlock_bh(&service->lock);
-                       kfree_rcu(service, rcu);
-               } else {
-                       spin_unlock_bh(&service->lock);
-               }
+       sc = tipc_bindtbl_find_service(sub->net, type);
+       if (!sc)
+               goto exit;
+
+       spin_lock_bh(&sc->lock);
+       list_del_init(&sub->service_list);
+       tipc_sub_put(sub);
+
+       /* Delete service item if no more bindings and subscriptions */
+       if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
+               hlist_del_init_rcu(&sc->service_list);
+               kfree_rcu(sc, rcu);
        }
+       spin_unlock_bh(&sc->lock);
+exit:
        spin_unlock_bh(&tn->bindingtbl_lock);
 }
 
@@ -833,38 +756,34 @@ int tipc_bindtbl_init(struct net *net)
 }
 
 /**
- * tipc_purge_bindings - remove all bindings for a given type
- *
- * tipc_bindtbl_lock must be held when calling this function
+ *  tipc_purge_bindings - remove all bindings for a given service
  */
-static void tipc_purge_bindings(struct net *net, struct tipc_service *service)
+static void tipc_purge_bindings(struct net *net, struct tipc_service *sc)
 {
-       struct tipc_binding *b, *safe;
-       struct service_range *sr;
-       struct name_info *info;
-
-       spin_lock_bh(&service->lock);
-       sr = service->ranges;
-       info = sr->info;
-       list_for_each_entry_safe(b, safe, &info->all_bindings, all_bindings) {
-               tipc_service_remove_binding(net, service, b->lower, b->node,
-                                           b->port, b->key);
-               kfree_rcu(b, rcu);
+       struct service_range *sr, *tmpr;
+       struct tipc_binding *b, *tmpb;
+
+       spin_lock_bh(&sc->lock);
+       rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
+               list_for_each_entry_safe(b, tmpb,
+                                        &sr->all_bindings, all_bindings) {
+                       tipc_service_remove_binding(net, sc, b->lower, b->node,
+                                                   b->port, b->key);
+                       kfree_rcu(b, rcu);
+               }
        }
-       hlist_del_init_rcu(&service->service_list);
-       kfree(service->ranges);
-       spin_unlock_bh(&service->lock);
-
-       kfree_rcu(service, rcu);
+       hlist_del_init_rcu(&sc->service_list);
+       spin_unlock_bh(&sc->lock);
+       kfree_rcu(sc, rcu);
 }
 
 void tipc_bindtbl_stop(struct net *net)
 {
-       u32 i;
-       struct tipc_service *service;
-       struct hlist_head *service_head;
+       struct tipc_binding_table *bt = tipc_binding_table(net);
        struct tipc_net *tn = tipc_net(net);
-       struct tipc_binding_table *bt = tn->bindingtbl;
+       struct hlist_head *service_head;
+       struct tipc_service *service;
+       u32 i;
 
        /* Verify name table is empty and purge any lingering
         * bindings, then release the name table
@@ -887,27 +806,27 @@ void tipc_bindtbl_stop(struct net *net)
 static int __tipc_nl_add_nametable_binding(struct tipc_nl_msg *msg,
                                           struct tipc_service *service,
                                           struct service_range *sr,
-                                          u32 *last_binding)
+                                          u32 *last_key)
 {
-       void *hdr;
+       struct tipc_binding *p;
        struct nlattr *attrs;
        struct nlattr *b;
-       struct tipc_binding *p;
+       void *hdr;
 
-       if (*last_binding) {
-               list_for_each_entry(p, &sr->info->all_bindings, all_bindings)
-                       if (p->key == *last_binding)
+       if (*last_key) {
+               list_for_each_entry(p, &sr->all_bindings, all_bindings)
+                       if (p->key == *last_key)
                                break;
-               if (p->key != *last_binding)
+               if (p->key != *last_key)
                        return -EPIPE;
        } else {
-               p = list_first_entry(&sr->info->all_bindings,
+               p = list_first_entry(&sr->all_bindings,
                                     struct tipc_binding,
                                     all_bindings);
        }
 
-       list_for_each_entry_from(p, &sr->info->all_bindings, all_bindings) {
-               *last_binding = p->key;
+       list_for_each_entry_from(p, &sr->all_bindings, all_bindings) {
+               *last_key = p->key;
 
                hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
                                  &tipc_genl_family, NLM_F_MULTI,
@@ -942,7 +861,7 @@ static int __tipc_nl_add_nametable_binding(struct 
tipc_nl_msg *msg,
                nla_nest_end(msg->skb, attrs);
                genlmsg_end(msg->skb, hdr);
        }
-       *last_binding = 0;
+       *last_key = 0;
 
        return 0;
 
@@ -957,41 +876,33 @@ static int __tipc_nl_add_nametable_binding(struct 
tipc_nl_msg *msg,
 }
 
 static int __tipc_nl_srvrange_list(struct tipc_nl_msg *msg,
-                                  struct tipc_service *service,
-                                  u32 *last_lower, u32 *last_binding)
+                                  struct tipc_service *sc,
+                                  u32 *last_lower, u32 *last_key)
 {
        struct service_range *sr;
-       struct service_range *sr_start;
+       struct rb_node *n;
        int err;
 
-       if (*last_lower) {
-               sr_start = tipc_service_find_range(service, *last_lower);
-               if (!sr_start)
-                       return -EPIPE;
-       } else {
-               sr_start = service->ranges;
-       }
-
-       for (sr = sr_start; sr != &service->ranges[service->first_free]; sr++) {
-               err = __tipc_nl_add_nametable_binding(msg, service, sr,
-                                                     last_binding);
+       for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->lower < *last_lower)
+                       continue;
+               err = __tipc_nl_add_nametable_binding(msg, sc, sr, last_key);
                if (err) {
                        *last_lower = sr->lower;
                        return err;
                }
        }
        *last_lower = 0;
-
        return 0;
 }
 
 static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg,
-                               u32 *last_type, u32 *last_lower,
-                               u32 *last_binding)
+                               u32 *last_type, u32 *last_lower, u32 *last_key)
 {
        struct tipc_net *tn = tipc_net(net);
-       struct hlist_head *head;
        struct tipc_service *service = NULL;
+       struct hlist_head *head;
        int err;
        int i;
 
@@ -1017,7 +928,7 @@ static int tipc_nl_service_list(struct net *net, struct 
tipc_nl_msg *msg,
                hlist_for_each_entry_from_rcu(service, service_list) {
                        spin_lock_bh(&service->lock);
                        err = __tipc_nl_srvrange_list(msg, service, last_lower,
-                                                     last_binding);
+                                                     last_key);
 
                        if (err) {
                                *last_type = service->type;
@@ -1033,13 +944,13 @@ static int tipc_nl_service_list(struct net *net, struct 
tipc_nl_msg *msg,
 
 int tipc_nl_bindtbl_dump(struct sk_buff *skb, struct netlink_callback *cb)
 {
-       int err;
-       int done = cb->args[3];
+       struct net *net = sock_net(skb->sk);
        u32 last_type = cb->args[0];
        u32 last_lower = cb->args[1];
-       u32 last_binding = cb->args[2];
-       struct net *net = sock_net(skb->sk);
+       u32 last_key = cb->args[2];
+       int done = cb->args[3];
        struct tipc_nl_msg msg;
+       int err;
 
        if (done)
                return 0;
@@ -1050,7 +961,7 @@ int tipc_nl_bindtbl_dump(struct sk_buff *skb, struct 
netlink_callback *cb)
 
        rcu_read_lock();
        err = tipc_nl_service_list(net, &msg, &last_type,
-                                  &last_lower, &last_binding);
+                                  &last_lower, &last_key);
        if (!err) {
                done = 1;
        } else if (err != -EMSGSIZE) {
@@ -1066,7 +977,7 @@ int tipc_nl_bindtbl_dump(struct sk_buff *skb, struct 
netlink_callback *cb)
 
        cb->args[0] = last_type;
        cb->args[1] = last_lower;
-       cb->args[2] = last_binding;
+       cb->args[2] = last_key;
        cb->args[3] = done;
 
        return skb->len;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index cbb87fc..a8983af 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1810,7 +1810,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link *l, struct 
sk_buff *skb,
 
 void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
 {
-       int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
+       int max_bulk = TIPC_MAX_BINDS / (l->mtu / ITEM_SIZE);
 
        l->window = win;
        l->backlog[TIPC_LOW_IMPORTANCE].limit      = max_t(u16, 50, win);
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index 24a720b..ca8d9d9 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -40,7 +40,7 @@
 #include "topsrv.h"
 
 #define TIPC_MAX_SUBSCR         65535
-#define TIPC_MAX_PUBLICATIONS   65535
+#define TIPC_MAX_BINDS          65535
 
 struct tipc_subscription;
 struct tipc_conn;
-- 
2.1.4


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to