Hi Hoang,
This is what I did back in 2015. It worked fully with hundreds of failover 
tests, and I was ready to deliver it.
Ying didn't like it though, and since it didn't give any visible performance 
improvements, I abandoned it  for the time being.

Back then I believed we could replace the rw lock with an rcu lock, as even 
suggested by David Miller, but I still cannot see how that would be possible.
RCU locks require that a copy of the item in question is updated, and then 
linked in at an atomic operation at the end of the grace period.
The node object is too big to just copy, update and re-link in, in my view, but 
It might be possible that there is still something I don't understand regarding 
this.

I suggest we try with this again, and then ask for feedback from the community. 
Either they must convince us that this is a bad approach, or that it is 
possible to do by using RCU locks instead.

Note that the code has changed a bit since this was developed, so the patch 
won't apply, but it should give you an idea about how to do it.

///jon


-----Original Message-----
From: Jon Maloy 
Sent: 26-Mar-19 15:14
To: Jon Maloy <jon.ma...@ericsson.com>; Jon Maloy <ma...@donjonn.com>
Cc: Tung Quang Nguyen <tung.q.ngu...@dektech.com.au>; Hoang Huu Le 
<hoang.h...@dektech.com.au>; tuong.l.l...@dektech.com.au; 
tipc-...@dektech.com.au
Subject: [PATCH 1/1] tipc: eliminate node rwlock

Thorough analysis of the code has shown that the node rwlock is not strictly 
needed. Instead, it can be replaced with proper usage of the link locks.

The read mode of this lock is almost always followed by grabbing a link lock, 
because it is used for either sending, receiving or running a timeout handler 
on a particular link.

The write mode means that we are either changing the working state of one or 
more links, or that we are redistributing the link slots in the node struct 
itself. These actions can be protected by grabbing the link lock of all the 
involved links, instead of using any separate lock for this.

We now change the node locking policy as follows:

- We grab a "write mode" lock for the node by just taking all (currently
  three) link spinlocks pertaining to that node.

- We "release" the write mode lock by releasing all the spinlock again,
  in the opposite order.

- As an effect of the above, grabbing any one of the link locks not
  only protects the link in question, but now effectively serves as
  a "read mode" lock for the whole node structure.

Since the ratio read_mode/write_mode is extremely low, we can safely make this 
change without worrying about the extra cost of grabbing multiple spinlocks.

In a few locations, where the current read mode lock is not followed by 
grabbing a link lock, we change the locking into "write mode" as described 
above. This only affects a couple of management calls, and will have no impact 
on overall performance.

Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
---
 net/tipc/node.c | 94 +++++++++++++++++++++++++--------------------------------
 1 file changed, 41 insertions(+), 53 deletions(-)

diff --git a/net/tipc/node.c b/net/tipc/node.c index 3f7a4ed..d3b01f6 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -264,19 +264,12 @@ static struct tipc_node *tipc_node_find(struct net *net, 
u32 addr)
        return NULL;
 }
 
-static void tipc_node_read_lock(struct tipc_node *n) -{
-       read_lock_bh(&n->lock);
-}
-
-static void tipc_node_read_unlock(struct tipc_node *n) -{
-       read_unlock_bh(&n->lock);
-}
-
 static void tipc_node_write_lock(struct tipc_node *n)  {
-       write_lock_bh(&n->lock);
+       int i = 0;
+
+       for (; i < MAX_BEARERS; i++)
+               spin_lock_bh(&n->links[i].lock);
 }
 
 static void tipc_node_write_unlock(struct tipc_node *n) @@ -285,13 +278,9 @@ 
static void tipc_node_write_unlock(struct tipc_node *n)
        u32 addr = 0;
        u32 flags = n->action_flags;
        u32 link_id = 0;
+       int i = 0;
        struct list_head *publ_list;
 
-       if (likely(!flags)) {
-               write_unlock_bh(&n->lock);
-               return;
-       }
-
        addr = n->addr;
        link_id = n->link_id;
        publ_list = &n->publ_list;
@@ -299,7 +288,11 @@ static void tipc_node_write_unlock(struct tipc_node *n)
        n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
                             TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
 
-       write_unlock_bh(&n->lock);
+       for (i = MAX_BEARERS - 1; i >= 0; i--)
+               spin_unlock_bh(&n->links[i].lock);
+
+       if (!flags)
+               return;
 
        if (flags & TIPC_NOTIFY_NODE_DOWN)
                tipc_publ_notify(net, publ_list, addr); @@ -516,7 +509,6 @@ 
static void tipc_node_timeout(unsigned long data)
        __skb_queue_head_init(&xmitq);
 
        for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
-               tipc_node_read_lock(n);
                le = &n->links[bearer_id];
                spin_lock_bh(&le->lock);
                if (le->link) {
@@ -525,7 +517,6 @@ static void tipc_node_timeout(unsigned long data)
                        rc = tipc_link_timeout(le->link, &xmitq);
                }
                spin_unlock_bh(&le->lock);
-               tipc_node_read_unlock(n);
                tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
                if (rc & TIPC_LINK_DOWN_EVT)
                        tipc_node_link_down(n, bearer_id, false); @@ -1113,14 
+1104,14 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
        if (bearer_id >= MAX_BEARERS)
                goto exit;
 
-       tipc_node_read_lock(node);
+       spin_lock_bh(&node->links[bearer_id].lock);
        link = node->links[bearer_id].link;
        if (link) {
                strncpy(linkname, tipc_link_name(link), len);
                err = 0;
        }
 exit:
-       tipc_node_read_unlock(node);
+       spin_unlock_bh(&node->links[bearer_id].lock);
        tipc_node_put(node);
        return err;
 }
@@ -1174,21 +1165,25 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head 
*list,
        struct tipc_link_entry *le = NULL;
        struct tipc_node *n;
        struct sk_buff_head xmitq;
-       int bearer_id = -1;
+       int bearer_id = -1, prel_id;
+       int slc = selector & 1;
        int rc = -EHOSTUNREACH;
 
        __skb_queue_head_init(&xmitq);
        n = tipc_node_find(net, dnode);
        if (likely(n)) {
-               tipc_node_read_lock(n);
-               bearer_id = n->active_links[selector & 1];
-               if (bearer_id >= 0) {
-                       le = &n->links[bearer_id];
+again:
+               prel_id = n->active_links[slc];
+               if (prel_id >= 0) {
+                       le = &n->links[prel_id];
                        spin_lock_bh(&le->lock);
-                       rc = tipc_link_xmit(le->link, list, &xmitq);
+                       bearer_id = n->active_links[slc];
+                       if (bearer_id == prel_id)
+                               rc = tipc_link_xmit(le->link, list, &xmitq);
                        spin_unlock_bh(&le->lock);
+                       if (unlikely(bearer_id != prel_id))
+                               goto again;
                }
-               tipc_node_read_unlock(n);
                if (likely(!skb_queue_empty(&xmitq))) {
                        tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
                        return 0;
@@ -1292,9 +1287,9 @@ static void tipc_node_bc_rcv(struct net *net, struct 
sk_buff *skb, int bearer_id
 
        /* Broadcast ACKs are sent on a unicast link */
        if (rc & TIPC_LINK_SND_BC_ACK) {
-               tipc_node_read_lock(n);
+               spin_lock_bh(&le->lock);
                tipc_link_build_ack_msg(le->link, &xmitq);
-               tipc_node_read_unlock(n);
+               spin_unlock_bh(&le->lock);
        }
 
        if (!skb_queue_empty(&xmitq))
@@ -1487,16 +1482,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, 
struct tipc_bearer *b)
                tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
        /* Receive packet directly if conditions permit */
-       tipc_node_read_lock(n);
+       spin_lock_bh(&le->lock);
        if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
-               spin_lock_bh(&le->lock);
                if (le->link) {
                        rc = tipc_link_rcv(le->link, skb, &xmitq);
                        skb = NULL;
                }
-               spin_unlock_bh(&le->lock);
        }
-       tipc_node_read_unlock(n);
+       spin_unlock_bh(&le->lock);
 
        /* Check/update node state before receiving */
        if (unlikely(skb)) {
@@ -1573,15 +1566,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct 
netlink_callback *cb)
                                continue;
                }
 
-               tipc_node_read_lock(node);
+               tipc_node_write_lock(node);
                err = __tipc_nl_add_node(&msg, node);
                if (err) {
                        last_addr = node->addr;
-                       tipc_node_read_unlock(node);
+                       tipc_node_write_unlock(node);
                        goto out;
                }
 
-               tipc_node_read_unlock(node);
+               tipc_node_write_unlock(node);
        }
        done = 1;
 out:
@@ -1612,7 +1605,7 @@ static struct tipc_node *tipc_node_find_by_name(struct 
net *net,
        *bearer_id = 0;
        rcu_read_lock();
        list_for_each_entry_rcu(n, &tn->node_list, list) {
-               tipc_node_read_lock(n);
+               tipc_node_write_lock(n);
                for (i = 0; i < MAX_BEARERS; i++) {
                        l = n->links[i].link;
                        if (l && !strcmp(tipc_link_name(l), link_name)) { @@ 
-1621,7 +1614,7 @@ static struct tipc_node *tipc_node_find_by_name(struct net 
*net,
                                break;
                        }
                }
-               tipc_node_read_unlock(n);
+               tipc_node_write_unlock(n);
                if (found_node)
                        break;
        }
@@ -1662,8 +1655,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, struct 
genl_info *info)
        if (!node)
                return -EINVAL;
 
-       tipc_node_read_lock(node);
-
+       spin_lock_bh(&node->links[bearer_id].lock);
        link = node->links[bearer_id].link;
        if (!link) {
                res = -EINVAL;
@@ -1701,7 +1693,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, struct 
genl_info *info)
        }
 
 out:
-       tipc_node_read_unlock(node);
+       spin_unlock_bh(&node->links[bearer_id].lock);
 
        return res;
 }
@@ -1738,17 +1730,16 @@ int tipc_nl_node_get_link(struct sk_buff *skb, struct 
genl_info *info)
                node = tipc_node_find_by_name(net, name, &bearer_id);
                if (!node)
                        return -EINVAL;
-
-               tipc_node_read_lock(node);
+               spin_lock_bh(&node->links[bearer_id].lock);
                link = node->links[bearer_id].link;
                if (!link) {
-                       tipc_node_read_unlock(node);
+                       spin_unlock_bh(&node->links[bearer_id].lock);
                        nlmsg_free(msg.skb);
                        return -EINVAL;
                }
 
                err = __tipc_nl_add_link(net, &msg, link, 0);
-               tipc_node_read_unlock(node);
+               spin_unlock_bh(&node->links[bearer_id].lock);
                if (err) {
                        nlmsg_free(msg.skb);
                        return err;
@@ -1795,17 +1786,14 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, 
struct genl_info *info)
                return -EINVAL;
 
        le = &node->links[bearer_id];
-       tipc_node_read_lock(node);
        spin_lock_bh(&le->lock);
        link = node->links[bearer_id].link;
        if (!link) {
                spin_unlock_bh(&le->lock);
-               tipc_node_read_unlock(node);
                return -EINVAL;
        }
        tipc_link_reset_stats(link);
        spin_unlock_bh(&le->lock);
-       tipc_node_read_unlock(node);
        return 0;
 }
 
@@ -1867,10 +1855,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct 
netlink_callback *cb)
 
                list_for_each_entry_continue_rcu(node, &tn->node_list,
                                                 list) {
-                       tipc_node_read_lock(node);
+                       tipc_node_write_lock(node);
                        err = __tipc_nl_add_node_links(net, &msg, node,
                                                       &prev_link);
-                       tipc_node_read_unlock(node);
+                       tipc_node_write_unlock(node);
                        if (err)
                                goto out;
 
@@ -1882,10 +1870,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct 
netlink_callback *cb)
                        goto out;
 
                list_for_each_entry_rcu(node, &tn->node_list, list) {
-                       tipc_node_read_lock(node);
+                       tipc_node_write_lock(node);
                        err = __tipc_nl_add_node_links(net, &msg, node,
                                                       &prev_link);
-                       tipc_node_read_unlock(node);
+                       tipc_node_write_unlock(node);
                        if (err)
                                goto out;
 
--
2.1.4



_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to