With the new RB tree structure for service ranges it becomes possible
to solve an old problem; - we can now allow overlapping service ranges
in the table.

When inserting a new service range to the tree, we use 'lower' as
primary key, and when necessary 'upper' as secondary key.

This guarantees that a well-formed binding item from a peer never will
be rejected, and makes it possible to eliminate the problematic backlog
functionality we currently have for handling such cases.

Signed-off-by: Jon Maloy <[email protected]>
---
 net/tipc/binding_distr.c | 102 +++++++++++------------------------------------
 net/tipc/binding_table.c |  74 +++++++++++++++-------------------
 net/tipc/binding_table.h |   7 ++--
 net/tipc/net.c           |   2 +-
 net/tipc/node.c          |   4 +-
 net/tipc/socket.c        |   4 +-
 6 files changed, 65 insertions(+), 128 deletions(-)

diff --git a/net/tipc/binding_distr.c b/net/tipc/binding_distr.c
index 207ff87..961f50b 100644
--- a/net/tipc/binding_distr.c
+++ b/net/tipc/binding_distr.c
@@ -194,32 +194,21 @@ void tipc_bindist_node_up(struct net *net, u32 dnode)
 }
 
 /**
- * tipc_publ_purge - remove publication associated with a failed node
+ * tipc_purge_binding - remove binding associated with a lost peer node
  *
  * Invoked for each publication issued by a newly failed node.
  * Removes publication structure from name table & deletes it.
  */
-static void tipc_purge_binding(struct net *net, struct tipc_binding *b,
-                              u32 addr)
+static void tipc_purge_binding(struct net *net, struct tipc_binding *b)
 {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_binding *p;
+       struct tipc_net *tn = tipc_net(net);
 
        spin_lock_bh(&tn->bindingtbl_lock);
-       p = tipc_bindtbl_remove_binding(net, b->type, b->lower,
-                                       b->node, b->port, b->key);
-       if (p)
-               tipc_node_unsubscribe(net, &p->binding_node, addr);
+       tipc_bindtbl_remove_binding(net, b->type, b->lower,
+                                   b->upper, b->node, b->key);
+       tipc_node_unsubscribe(net, &b->binding_node, b->node);
        spin_unlock_bh(&tn->bindingtbl_lock);
-
-       if (p != b) {
-               pr_err("Unable to remove publication from failed node\n"
-                      " (type=%u, lower=%u, node=0x%x, port=%u, key=%u)\n",
-                      b->type, b->lower, b->node, b->port,
-                      b->key);
-       }
-
-       kfree_rcu(p, rcu);
+       kfree_rcu(b, rcu);
 }
 
 /**
@@ -245,13 +234,12 @@ void tipc_publ_notify(struct net *net, struct list_head 
*nsub_list, u32 addr)
        struct tipc_binding *b, *tmp;
 
        list_for_each_entry_safe(b, tmp, nsub_list, binding_node)
-               tipc_purge_binding(net, b, addr);
+               tipc_purge_binding(net, b);
        tipc_dist_queue_purge(net, addr);
 }
 
 /**
- * tipc_update_nametbl - try to process a nametable update and notify
- *                      subscribers
+ * tipc_update_nametbl - process a nametable update and notify subscribers
  *
  * tipc_bindtbl_lock must be held.
  * Returns the publication item if successful, otherwise NULL.
@@ -260,27 +248,32 @@ static bool tipc_update_nametbl(struct net *net, struct 
distr_item *i,
                                u32 node, u32 dtype)
 {
        struct tipc_binding *b = NULL;
+       u32 lower = ntohl(i->lower);
+       u32 upper = ntohl(i->upper);
+       u32 type = ntohl(i->type);
+       u32 port = ntohl(i->port);
+       u32 key = ntohl(i->key);
 
        if (dtype == PUBLICATION) {
-               b = tipc_bindtbl_insert_binding(net, ntohl(i->type),
-                                               ntohl(i->lower),
-                                               ntohl(i->upper),
+               b = tipc_bindtbl_insert_binding(net, type, lower, upper,
                                                TIPC_CLUSTER_SCOPE, node,
-                                               ntohl(i->port), ntohl(i->key));
+                                               port, key);
                if (b) {
                        tipc_node_subscribe(net, &b->binding_node, node);
                        return true;
                }
+               pr_warn_ratelimited("Failed to add binding %u,%u,%u from %x\n",
+                                   type, lower, upper, node);
        } else if (dtype == WITHDRAWAL) {
-               b = tipc_bindtbl_remove_binding(net, ntohl(i->type),
-                                               ntohl(i->lower),
-                                               node, ntohl(i->port),
-                                               ntohl(i->key));
+               b = tipc_bindtbl_remove_binding(net, type, lower,
+                                               upper, node, key);
                if (b) {
                        tipc_node_unsubscribe(net, &b->binding_node, node);
                        kfree_rcu(b, rcu);
                        return true;
                }
+               pr_warn_ratelimited("Failed to remove binding %u,%u from %x\n",
+                                   type, lower, node);
        } else {
                pr_warn("Unrecognized name table message received\n");
        }
@@ -288,53 +281,6 @@ static bool tipc_update_nametbl(struct net *net, struct 
distr_item *i,
 }
 
 /**
- * tipc_bindist_add_backlog - add a failed name table update to the backlog
- *
- */
-static void tipc_bindist_add_backlog(struct net *net, struct distr_item *i,
-                                    u32 type, u32 node)
-{
-       struct distr_queue_item *e;
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       unsigned long now = get_jiffies_64();
-
-       e = kzalloc(sizeof(*e), GFP_ATOMIC);
-       if (!e)
-               return;
-       e->dtype = type;
-       e->node = node;
-       e->expires = now + msecs_to_jiffies(sysctl_tipc_bindist_timeout);
-       memcpy(e, i, sizeof(*i));
-       list_add_tail(&e->next, &tn->dist_queue);
-}
-
-/**
- * tipc_bindist_process_backlog - try to process any pending name table updates
- * from the network.
- */
-void tipc_bindist_process_backlog(struct net *net)
-{
-       struct distr_queue_item *e, *tmp;
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       unsigned long now = get_jiffies_64();
-
-       list_for_each_entry_safe(e, tmp, &tn->dist_queue, next) {
-               if (time_after(e->expires, now)) {
-                       if (!tipc_update_nametbl(net, &e->i, e->node, e->dtype))
-                               continue;
-               } else {
-                       pr_warn_ratelimited("Dropping name table update (%d) of 
{%u, %u, %u} from %x key=%u\n",
-                                           e->dtype, ntohl(e->i.type),
-                                           ntohl(e->i.lower),
-                                           ntohl(e->i.upper),
-                                           e->node, ntohl(e->i.key));
-               }
-               list_del(&e->next);
-               kfree(e);
-       }
-}
-
-/**
  * tipc_bindist_rcv - process name table update messages sent by another node
  */
 void tipc_bindist_rcv(struct net *net, struct sk_buff_head *inputq)
@@ -356,12 +302,10 @@ void tipc_bindist_rcv(struct net *net, struct 
sk_buff_head *inputq)
                count = msg_data_sz(hdr) / ITEM_SIZE;
                node = msg_orignode(hdr);
                while (count--) {
-                       if (!tipc_update_nametbl(net, item, node, mtyp))
-                               tipc_bindist_add_backlog(net, item, mtyp, node);
+                       tipc_update_nametbl(net, item, node, mtyp);
                        item++;
                }
                kfree_skb(skb);
-               tipc_bindist_process_backlog(net);
        }
        spin_unlock_bh(&tn->bindingtbl_lock);
 }
diff --git a/net/tipc/binding_table.c b/net/tipc/binding_table.c
index 0203297..81a2de1 100644
--- a/net/tipc/binding_table.c
+++ b/net/tipc/binding_table.c
@@ -118,8 +118,6 @@ static struct tipc_binding *tipc_create_binding(u32 type, 
u32 lower, u32 upper,
 
 /**
  * tipc_service_create - create a service structure for the specified 'type'
- *
- * Allocates a single range structure and sets it to all 0's.
  */
 static struct tipc_service *tipc_service_create(u32 type, struct hlist_head 
*hd)
 {
@@ -141,8 +139,7 @@ static struct tipc_service *tipc_service_create(u32 type, 
struct hlist_head *hd)
 
 /**
  * tipc_service_find_range - find service range matching a service instance
- *
- * Very time-critical, so binary searches through range array.
+ * Very time-critical
  */
 static struct service_range *tipc_service_find_range(struct tipc_service *sc,
                                                     u32 instance)
@@ -175,10 +172,14 @@ static struct service_range 
*tipc_service_create_range(struct tipc_service *sc,
                tmp = container_of(parent, struct service_range, tree_node);
                if (lower < tmp->lower)
                        n = &(*n)->rb_left;
+               else if (lower > tmp->lower)
+                       n = &(*n)->rb_right;
+               else if (upper < tmp->upper)
+                       n = &(*n)->rb_left;
                else if (upper > tmp->upper)
                        n = &(*n)->rb_right;
                else
-                       return NULL;
+                       return tmp;
        }
        sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
        if (!sr)
@@ -204,17 +205,12 @@ static struct tipc_binding 
*tipc_service_insert_binding(struct net *net,
        struct tipc_binding *b;
        bool first = false;
 
-       sr = tipc_service_find_range(sc, lower);
-       if (!sr) {
-               sr = tipc_service_create_range(sc, lower, upper);
-               if (!sr)
-                       goto  err;
-               first = true;
-       }
+       /* Create range if not already existing */
+       sr = tipc_service_create_range(sc, lower, upper);
+       if (!sr)
+               goto  err;
 
-       /* Lower end overlaps existing entry, but we need an exact match */
-       if (sr->lower != lower || sr->upper != upper)
-               return NULL;
+       first = list_empty(&sr->all_bindings);
 
        /* Return if the binding already exists */
        list_for_each_entry(b, &sr->all_bindings, all_bindings) {
@@ -241,32 +237,31 @@ static struct tipc_binding 
*tipc_service_insert_binding(struct net *net,
        return NULL;
 }
 
-/**
- * tipc_service_remove_binding
- *
- * NOTE: There may be cases where TIPC is asked to remove a binding
- * that is not in the name table.  For example, if another node issues a
- * binding for a name range that overlaps an existing name range
- * the binding will not be recorded, which means the binding won't
- * be found when the name range is later withdrawn by that node.
- * A failed withdraw request simply returns a failure indication and lets the
- * caller issue any error or warning messages associated with such a problem.
- */
 static struct tipc_binding *tipc_service_remove_binding(struct net *net,
                                                        struct tipc_service *sc,
-                                                       u32 inst, u32 node,
-                                                       u32 port, u32 key)
+                                                       u32 lower, u32 upper,
+                                                       u32 node, u32 key)
 {
        struct tipc_subscription *sub, *tmp;
        struct service_range *sr;
        struct tipc_binding *b;
        bool found = false;
        bool last = false;
+       struct rb_node *n;
 
-       sr = tipc_service_find_range(sc, inst);
+       sr = tipc_service_find_range(sc, lower);
        if (!sr)
                return NULL;
 
+       /* Find exact matching service range */
+       for (n = &sr->tree_node; n; n = rb_next(n)) {
+               sr = container_of(n, struct service_range, tree_node);
+               if (sr->upper == upper)
+                       break;
+       }
+       if (!n || sr->lower != lower || sr->upper != upper)
+               return NULL;
+
        /* Find binding, if it exists */
        list_for_each_entry(b, &sr->all_bindings, all_bindings) {
                if (b->key != key || (node && node != b->node))
@@ -379,8 +374,8 @@ struct tipc_binding *tipc_bindtbl_insert_binding(struct net 
*net, u32 type,
 }
 
 struct tipc_binding *tipc_bindtbl_remove_binding(struct net *net, u32 type,
-                                                u32 lower, u32 node, u32 port,
-                                                u32 key)
+                                                u32 lower, u32 upper,
+                                                u32 node, u32 key)
 {
        struct tipc_service *sc = tipc_bindtbl_find_service(net, type);
        struct tipc_binding *b = NULL;
@@ -389,7 +384,7 @@ struct tipc_binding *tipc_bindtbl_remove_binding(struct net 
*net, u32 type,
                return NULL;
 
        spin_lock_bh(&sc->lock);
-       b = tipc_service_remove_binding(net, sc, lower, node, port, key);
+       b = tipc_service_remove_binding(net, sc, lower, upper, node, key);
 
        /* Delete service item if this no more bindings and subscriptions */
        if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
@@ -623,8 +618,6 @@ struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 
type, u32 lower,
        if (b) {
                bt->local_bindings_count++;
                skb = tipc_bindist_publish(net, b);
-               /* Any pending external events? */
-               tipc_bindist_process_backlog(net);
        }
 exit:
        spin_unlock_bh(&tn->bindingtbl_lock);
@@ -637,7 +630,8 @@ struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 
type, u32 lower,
 /**
  * tipc_bindtbl_unbind - withdraw a service binding
  */
-int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 port, u32 
key)
+int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower,
+                       u32 upper, u32 key)
 {
        struct tipc_binding_table *bt = tipc_binding_table(net);
        struct tipc_net *tn = tipc_net(net);
@@ -647,17 +641,15 @@ int tipc_bindtbl_unbind(struct net *net, u32 type, u32 
lower, u32 port, u32 key)
 
        spin_lock_bh(&tn->bindingtbl_lock);
 
-       b = tipc_bindtbl_remove_binding(net, type, lower, self, port, key);
+       b = tipc_bindtbl_remove_binding(net, type, lower, upper, self, key);
        if (b) {
                bt->local_bindings_count--;
                skb = tipc_bindist_withdraw(net, b);
-               /* Any pending external events? */
-               tipc_bindist_process_backlog(net);
                list_del_init(&b->binding_sock);
                kfree_rcu(b, rcu);
        } else {
                pr_err("Failed to remove local binding {%u,%u,%u}/%u\n",
-                      type, lower, port, key);
+                      type, lower, upper, key);
        }
        spin_unlock_bh(&tn->bindingtbl_lock);
 
@@ -756,8 +748,8 @@ static void tipc_purge_bindings(struct net *net, struct 
tipc_service *sc)
        rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
                list_for_each_entry_safe(b, tmpb,
                                         &sr->all_bindings, all_bindings) {
-                       tipc_service_remove_binding(net, sc, b->lower, b->node,
-                                                   b->port, b->key);
+                       tipc_service_remove_binding(net, sc, b->lower, b->upper,
+                                                   b->node, b->key);
                        kfree_rcu(b, rcu);
                }
        }
diff --git a/net/tipc/binding_table.h b/net/tipc/binding_table.h
index 2d9f156..da57137 100644
--- a/net/tipc/binding_table.h
+++ b/net/tipc/binding_table.h
@@ -115,14 +115,15 @@ bool tipc_bindtbl_lookup(struct net *net, u32 type, u32 
instance, u32 domain,
 struct tipc_binding *tipc_bindtbl_bind(struct net *net, u32 type, u32 lower,
                                       u32 upper, u32 scope, u32 port_ref,
                                       u32 key);
-int tipc_bindtbl_unbind(struct net *net, u32 type, u32 lower, u32 ref, u32 
key);
+int tipc_bindtbl_unbind(struct net *net, u32 type,
+                       u32 lower, u32 upper, u32 key);
 struct tipc_binding *tipc_bindtbl_insert_binding(struct net *net, u32 type,
                                                 u32 lower, u32 upper,
                                                 u32 scope, u32 node, u32 ref,
                                                 u32 key);
 struct tipc_binding *tipc_bindtbl_remove_binding(struct net *net, u32 type,
-                                                u32 lower, u32 node, u32 ref,
-                                                u32 key);
+                                                u32 lower, u32 upper,
+                                                u32 node, u32 key);
 void tipc_bindtbl_subscribe(struct tipc_subscription *s);
 void tipc_bindtbl_unsubscribe(struct tipc_subscription *s);
 int tipc_bindtbl_init(struct net *net);
diff --git a/net/tipc/net.c b/net/tipc/net.c
index a37c8cb..3a1a1b8 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -136,7 +136,7 @@ void tipc_net_stop(struct net *net)
        if (!self)
                return;
 
-       tipc_bindtbl_unbind(net, TIPC_CFG_SRV, self, 0, self);
+       tipc_bindtbl_unbind(net, TIPC_CFG_SRV, self, self, self);
        rtnl_lock();
        tipc_bearer_stop(net);
        tipc_node_stop(net);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index b7a2413..6c1bb08 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -324,11 +324,11 @@ static void tipc_node_write_unlock(struct tipc_node *n)
        if (flags & TIPC_NOTIFY_LINK_UP) {
                tipc_mon_peer_up(net, addr, bearer_id);
                tipc_bindtbl_bind(net, TIPC_LINK_STATE, addr, addr,
-                                 TIPC_NODE_SCOPE, link_id, addr);
+                                 TIPC_NODE_SCOPE, link_id, link_id);
        }
        if (flags & TIPC_NOTIFY_LINK_DOWN) {
                tipc_mon_peer_down(net, addr, bearer_id);
-               tipc_bindtbl_unbind(net, TIPC_LINK_STATE, addr, link_id, addr);
+               tipc_bindtbl_unbind(net, TIPC_LINK_STATE, addr, addr, link_id);
        }
 }
 
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index a242869..dc8826f 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2634,11 +2634,11 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint 
scope,
                        if (b->upper != seq->upper)
                                break;
                        tipc_bindtbl_unbind(net, b->type, b->lower,
-                                           b->port, b->key);
+                                           b->upper, b->key);
                        rc = 0;
                        break;
                }
-               tipc_bindtbl_unbind(net, b->type, b->lower, b->port, b->key);
+               tipc_bindtbl_unbind(net, b->type, b->lower, b->upper, b->key);
                rc = 0;
        }
        if (list_empty(&tsk->publications))
-- 
2.1.4


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to