Hi Hoang, This is what I did back in 2015. It worked fully with hundreds of failover tests, and I was ready to deliver it. Ying didn't like it though, and since it didn't give any visible performance improvements, I abandoned it for the time being.
Back then I believed we could replace the rw lock with an rcu lock, as even suggested by David Miller, but I still cannot see how that would be possible. RCU locks require that a copy of the item in question is updated, and then linked in at an atomic operation at the end of the grace period. The node object is too big to just copy, update and re-link in, in my view, but It might be possible that there is still something I don't understand regarding this. I suggest we try with this again, and then ask for feedback from the community. Either they must convince us that this is a bad approach, or that it is possible to do by using RCU locks instead. Note that the code has changed a bit since this was developed, so the patch won't apply, but it should give you an idea about how to do it. ///jon -----Original Message----- From: Jon Maloy Sent: 26-Mar-19 15:14 To: Jon Maloy <jon.ma...@ericsson.com>; Jon Maloy <ma...@donjonn.com> Cc: Tung Quang Nguyen <tung.q.ngu...@dektech.com.au>; Hoang Huu Le <hoang.h...@dektech.com.au>; tuong.l.l...@dektech.com.au; tipc-...@dektech.com.au Subject: [PATCH 1/1] tipc: eliminate node rwlock Thorough analysis of the code has shown that the node rwlock is not strictly needed. Instead, it can be replaced with proper usage of the link locks. The read mode of this lock is almost always followed by grabbing a link lock, because it is used for either sending, receiving or running a timeout handler on a particular link. The write mode means that we are either changing the working state of one or more links, or that we are redistributing the link slots in the node struct itself. These actions can be protected by grabbing the link lock of all the involved links, instead of using any separate lock for this. We now change the node locking policy as follows: - We grab a "write mode" lock for the node by just taking all (currently three) link spinlocks pertaining to that node. - We "release" the write mode lock by releasing all the spinlock again, in the opposite order. - As an effect of the above, grabbing any one of the link locks not only protects the link in question, but now effectively serves as a "read mode" lock for the whole node structure. Since the ratio read_mode/write_mode is extremely low, we can safely make this change without worrying about the extra cost of grabbing multiple spinlocks. In a few locations, where the current read mode lock is not followed by grabbing a link lock, we change the locking into "write mode" as described above. This only affects a couple of management calls, and will have no impact on overall performance. Signed-off-by: Jon Maloy <jon.ma...@ericsson.com> --- net/tipc/node.c | 94 +++++++++++++++++++++++++-------------------------------- 1 file changed, 41 insertions(+), 53 deletions(-) diff --git a/net/tipc/node.c b/net/tipc/node.c index 3f7a4ed..d3b01f6 100644 --- a/net/tipc/node.c +++ b/net/tipc/node.c @@ -264,19 +264,12 @@ static struct tipc_node *tipc_node_find(struct net *net, u32 addr) return NULL; } -static void tipc_node_read_lock(struct tipc_node *n) -{ - read_lock_bh(&n->lock); -} - -static void tipc_node_read_unlock(struct tipc_node *n) -{ - read_unlock_bh(&n->lock); -} - static void tipc_node_write_lock(struct tipc_node *n) { - write_lock_bh(&n->lock); + int i = 0; + + for (; i < MAX_BEARERS; i++) + spin_lock_bh(&n->links[i].lock); } static void tipc_node_write_unlock(struct tipc_node *n) @@ -285,13 +278,9 @@ static void tipc_node_write_unlock(struct tipc_node *n) u32 addr = 0; u32 flags = n->action_flags; u32 link_id = 0; + int i = 0; struct list_head *publ_list; - if (likely(!flags)) { - write_unlock_bh(&n->lock); - return; - } - addr = n->addr; link_id = n->link_id; publ_list = &n->publ_list; @@ -299,7 +288,11 @@ static void tipc_node_write_unlock(struct tipc_node *n) n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP); - write_unlock_bh(&n->lock); + for (i = MAX_BEARERS - 1; i >= 0; i--) + spin_unlock_bh(&n->links[i].lock); + + if (!flags) + return; if (flags & TIPC_NOTIFY_NODE_DOWN) tipc_publ_notify(net, publ_list, addr); @@ -516,7 +509,6 @@ static void tipc_node_timeout(unsigned long data) __skb_queue_head_init(&xmitq); for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { - tipc_node_read_lock(n); le = &n->links[bearer_id]; spin_lock_bh(&le->lock); if (le->link) { @@ -525,7 +517,6 @@ static void tipc_node_timeout(unsigned long data) rc = tipc_link_timeout(le->link, &xmitq); } spin_unlock_bh(&le->lock); - tipc_node_read_unlock(n); tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); if (rc & TIPC_LINK_DOWN_EVT) tipc_node_link_down(n, bearer_id, false); @@ -1113,14 +1104,14 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr, if (bearer_id >= MAX_BEARERS) goto exit; - tipc_node_read_lock(node); + spin_lock_bh(&node->links[bearer_id].lock); link = node->links[bearer_id].link; if (link) { strncpy(linkname, tipc_link_name(link), len); err = 0; } exit: - tipc_node_read_unlock(node); + spin_unlock_bh(&node->links[bearer_id].lock); tipc_node_put(node); return err; } @@ -1174,21 +1165,25 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list, struct tipc_link_entry *le = NULL; struct tipc_node *n; struct sk_buff_head xmitq; - int bearer_id = -1; + int bearer_id = -1, prel_id; + int slc = selector & 1; int rc = -EHOSTUNREACH; __skb_queue_head_init(&xmitq); n = tipc_node_find(net, dnode); if (likely(n)) { - tipc_node_read_lock(n); - bearer_id = n->active_links[selector & 1]; - if (bearer_id >= 0) { - le = &n->links[bearer_id]; +again: + prel_id = n->active_links[slc]; + if (prel_id >= 0) { + le = &n->links[prel_id]; spin_lock_bh(&le->lock); - rc = tipc_link_xmit(le->link, list, &xmitq); + bearer_id = n->active_links[slc]; + if (bearer_id == prel_id) + rc = tipc_link_xmit(le->link, list, &xmitq); spin_unlock_bh(&le->lock); + if (unlikely(bearer_id != prel_id)) + goto again; } - tipc_node_read_unlock(n); if (likely(!skb_queue_empty(&xmitq))) { tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); return 0; @@ -1292,9 +1287,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id /* Broadcast ACKs are sent on a unicast link */ if (rc & TIPC_LINK_SND_BC_ACK) { - tipc_node_read_lock(n); + spin_lock_bh(&le->lock); tipc_link_build_ack_msg(le->link, &xmitq); - tipc_node_read_unlock(n); + spin_unlock_bh(&le->lock); } if (!skb_queue_empty(&xmitq)) @@ -1487,16 +1482,14 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b) tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); /* Receive packet directly if conditions permit */ - tipc_node_read_lock(n); + spin_lock_bh(&le->lock); if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) { - spin_lock_bh(&le->lock); if (le->link) { rc = tipc_link_rcv(le->link, skb, &xmitq); skb = NULL; } - spin_unlock_bh(&le->lock); } - tipc_node_read_unlock(n); + spin_unlock_bh(&le->lock); /* Check/update node state before receiving */ if (unlikely(skb)) { @@ -1573,15 +1566,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) continue; } - tipc_node_read_lock(node); + tipc_node_write_lock(node); err = __tipc_nl_add_node(&msg, node); if (err) { last_addr = node->addr; - tipc_node_read_unlock(node); + tipc_node_write_unlock(node); goto out; } - tipc_node_read_unlock(node); + tipc_node_write_unlock(node); } done = 1; out: @@ -1612,7 +1605,7 @@ static struct tipc_node *tipc_node_find_by_name(struct net *net, *bearer_id = 0; rcu_read_lock(); list_for_each_entry_rcu(n, &tn->node_list, list) { - tipc_node_read_lock(n); + tipc_node_write_lock(n); for (i = 0; i < MAX_BEARERS; i++) { l = n->links[i].link; if (l && !strcmp(tipc_link_name(l), link_name)) { @@ -1621,7 +1614,7 @@ static struct tipc_node *tipc_node_find_by_name(struct net *net, break; } } - tipc_node_read_unlock(n); + tipc_node_write_unlock(n); if (found_node) break; } @@ -1662,8 +1655,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info) if (!node) return -EINVAL; - tipc_node_read_lock(node); - + spin_lock_bh(&node->links[bearer_id].lock); link = node->links[bearer_id].link; if (!link) { res = -EINVAL; @@ -1701,7 +1693,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info) } out: - tipc_node_read_unlock(node); + spin_unlock_bh(&node->links[bearer_id].lock); return res; } @@ -1738,17 +1730,16 @@ int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info) node = tipc_node_find_by_name(net, name, &bearer_id); if (!node) return -EINVAL; - - tipc_node_read_lock(node); + spin_lock_bh(&node->links[bearer_id].lock); link = node->links[bearer_id].link; if (!link) { - tipc_node_read_unlock(node); + spin_unlock_bh(&node->links[bearer_id].lock); nlmsg_free(msg.skb); return -EINVAL; } err = __tipc_nl_add_link(net, &msg, link, 0); - tipc_node_read_unlock(node); + spin_unlock_bh(&node->links[bearer_id].lock); if (err) { nlmsg_free(msg.skb); return err; @@ -1795,17 +1786,14 @@ int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info) return -EINVAL; le = &node->links[bearer_id]; - tipc_node_read_lock(node); spin_lock_bh(&le->lock); link = node->links[bearer_id].link; if (!link) { spin_unlock_bh(&le->lock); - tipc_node_read_unlock(node); return -EINVAL; } tipc_link_reset_stats(link); spin_unlock_bh(&le->lock); - tipc_node_read_unlock(node); return 0; } @@ -1867,10 +1855,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) list_for_each_entry_continue_rcu(node, &tn->node_list, list) { - tipc_node_read_lock(node); + tipc_node_write_lock(node); err = __tipc_nl_add_node_links(net, &msg, node, &prev_link); - tipc_node_read_unlock(node); + tipc_node_write_unlock(node); if (err) goto out; @@ -1882,10 +1870,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb) goto out; list_for_each_entry_rcu(node, &tn->node_list, list) { - tipc_node_read_lock(node); + tipc_node_write_lock(node); err = __tipc_nl_add_node_links(net, &msg, node, &prev_link); - tipc_node_read_unlock(node); + tipc_node_write_unlock(node); if (err) goto out; -- 2.1.4 _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion