Hi. I remember this 😊 The node struct size could potentially be a problem on preempt kernels is if we need to do synchronize_rcu in a hot path (tipc_rcv). But i dont think thats necessary, and that the RCU approach is preferrable.
//E On Tue, 26 Mar 2019, 22:03 Jon Maloy, <jon.ma...@ericsson.com> wrote: > Hi Hoang, > This is what I did back in 2015. It worked fully with hundreds of failover > tests, and I was ready to deliver it. > Ying didn't like it though, and since it didn't give any visible > performance improvements, I abandoned it for the time being. > > Back then I believed we could replace the rw lock with an rcu lock, as > even suggested by David Miller, but I still cannot see how that would be > possible. > RCU locks require that a copy of the item in question is updated, and then > linked in at an atomic operation at the end of the grace period. > The node object is too big to just copy, update and re-link in, in my > view, but It might be possible that there is still something I don't > understand regarding this. > > I suggest we try with this again, and then ask for feedback from the > community. Either they must convince us that this is a bad approach, or > that it is possible to do by using RCU locks instead. > > Note that the code has changed a bit since this was developed, so the > patch won't apply, but it should give you an idea about how to do it. > > ///jon > > > -----Original Message----- > From: Jon Maloy > Sent: 26-Mar-19 15:14 > To: Jon Maloy <jon.ma...@ericsson.com>; Jon Maloy <ma...@donjonn.com> > Cc: Tung Quang Nguyen <tung.q.ngu...@dektech.com.au>; Hoang Huu Le < > hoang.h...@dektech.com.au>; tuong.l.l...@dektech.com.au; > tipc-...@dektech.com.au > Subject: [PATCH 1/1] tipc: eliminate node rwlock > > Thorough analysis of the code has shown that the node rwlock is not > strictly needed. Instead, it can be replaced with proper usage of the link > locks. > > The read mode of this lock is almost always followed by grabbing a link > lock, because it is used for either sending, receiving or running a timeout > handler on a particular link. > > The write mode means that we are either changing the working state of one > or more links, or that we are redistributing the link slots in the node > struct itself. These actions can be protected by grabbing the link lock of > all the involved links, instead of using any separate lock for this. > > We now change the node locking policy as follows: > > - We grab a "write mode" lock for the node by just taking all (currently > three) link spinlocks pertaining to that node. > > - We "release" the write mode lock by releasing all the spinlock again, > in the opposite order. > > - As an effect of the above, grabbing any one of the link locks not > only protects the link in question, but now effectively serves as > a "read mode" lock for the whole node structure. > > Since the ratio read_mode/write_mode is extremely low, we can safely make > this change without worrying about the extra cost of grabbing multiple > spinlocks. > > In a few locations, where the current read mode lock is not followed by > grabbing a link lock, we change the locking into "write mode" as described > above. This only affects a couple of management calls, and will have no > impact on overall performance. > > Signed-off-by: Jon Maloy <jon.ma...@ericsson.com> > --- > net/tipc/node.c | 94 > +++++++++++++++++++++++++-------------------------------- > 1 file changed, 41 insertions(+), 53 deletions(-) > > diff --git a/net/tipc/node.c b/net/tipc/node.c index 3f7a4ed..d3b01f6 > 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -264,19 +264,12 @@ static struct tipc_node *tipc_node_find(struct net > *net, u32 addr) > return NULL; > } > > -static void tipc_node_read_lock(struct tipc_node *n) -{ > - read_lock_bh(&n->lock); > -} > - > -static void tipc_node_read_unlock(struct tipc_node *n) -{ > - read_unlock_bh(&n->lock); > -} > - > static void tipc_node_write_lock(struct tipc_node *n) { > - write_lock_bh(&n->lock); > + int i = 0; > + > + for (; i < MAX_BEARERS; i++) > + spin_lock_bh(&n->links[i].lock); > } > > static void tipc_node_write_unlock(struct tipc_node *n) @@ -285,13 +278,9 > @@ static void tipc_node_write_unlock(struct tipc_node *n) > u32 addr = 0; > u32 flags = n->action_flags; > u32 link_id = 0; > + int i = 0; > struct list_head *publ_list; > > - if (likely(!flags)) { > - write_unlock_bh(&n->lock); > - return; > - } > - > addr = n->addr; > link_id = n->link_id; > publ_list = &n->publ_list; > @@ -299,7 +288,11 @@ static void tipc_node_write_unlock(struct tipc_node > *n) > n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | > TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP); > > - write_unlock_bh(&n->lock); > + for (i = MAX_BEARERS - 1; i >= 0; i--) > + spin_unlock_bh(&n->links[i].lock); > + > + if (!flags) > + return; > > if (flags & TIPC_NOTIFY_NODE_DOWN) > tipc_publ_notify(net, publ_list, addr); @@ -516,7 +509,6 > @@ static void tipc_node_timeout(unsigned long data) > __skb_queue_head_init(&xmitq); > > for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { > - tipc_node_read_lock(n); > le = &n->links[bearer_id]; > spin_lock_bh(&le->lock); > if (le->link) { > @@ -525,7 +517,6 @@ static void tipc_node_timeout(unsigned long data) > rc = tipc_link_timeout(le->link, &xmitq); > } > spin_unlock_bh(&le->lock); > - tipc_node_read_unlock(n); > tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); > if (rc & TIPC_LINK_DOWN_EVT) > tipc_node_link_down(n, bearer_id, false); @@ > -1113,14 +1104,14 @@ int tipc_node_get_linkname(struct net *net, u32 > bearer_id, u32 addr, > if (bearer_id >= MAX_BEARERS) > goto exit; > > - tipc_node_read_lock(node); > + spin_lock_bh(&node->links[bearer_id].lock); > link = node->links[bearer_id].link; > if (link) { > strncpy(linkname, tipc_link_name(link), len); > err = 0; > } > exit: > - tipc_node_read_unlock(node); > + spin_unlock_bh(&node->links[bearer_id].lock); > tipc_node_put(node); > return err; > } > @@ -1174,21 +1165,25 @@ int tipc_node_xmit(struct net *net, struct > sk_buff_head *list, > struct tipc_link_entry *le = NULL; > struct tipc_node *n; > struct sk_buff_head xmitq; > - int bearer_id = -1; > + int bearer_id = -1, prel_id; > + int slc = selector & 1; > int rc = -EHOSTUNREACH; > > __skb_queue_head_init(&xmitq); > n = tipc_node_find(net, dnode); > if (likely(n)) { > - tipc_node_read_lock(n); > - bearer_id = n->active_links[selector & 1]; > - if (bearer_id >= 0) { > - le = &n->links[bearer_id]; > +again: > + prel_id = n->active_links[slc]; > + if (prel_id >= 0) { > + le = &n->links[prel_id]; > spin_lock_bh(&le->lock); > - rc = tipc_link_xmit(le->link, list, &xmitq); > + bearer_id = n->active_links[slc]; > + if (bearer_id == prel_id) > + rc = tipc_link_xmit(le->link, list, > &xmitq); > spin_unlock_bh(&le->lock); > + if (unlikely(bearer_id != prel_id)) > + goto again; > } > - tipc_node_read_unlock(n); > if (likely(!skb_queue_empty(&xmitq))) { > tipc_bearer_xmit(net, bearer_id, &xmitq, > &le->maddr); > return 0; > @@ -1292,9 +1287,9 @@ static void tipc_node_bc_rcv(struct net *net, struct > sk_buff *skb, int bearer_id > > /* Broadcast ACKs are sent on a unicast link */ > if (rc & TIPC_LINK_SND_BC_ACK) { > - tipc_node_read_lock(n); > + spin_lock_bh(&le->lock); > tipc_link_build_ack_msg(le->link, &xmitq); > - tipc_node_read_unlock(n); > + spin_unlock_bh(&le->lock); > } > > if (!skb_queue_empty(&xmitq)) > @@ -1487,16 +1482,14 @@ void tipc_rcv(struct net *net, struct sk_buff > *skb, struct tipc_bearer *b) > tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); > > /* Receive packet directly if conditions permit */ > - tipc_node_read_lock(n); > + spin_lock_bh(&le->lock); > if (likely((n->state == SELF_UP_PEER_UP) && (usr != > TUNNEL_PROTOCOL))) { > - spin_lock_bh(&le->lock); > if (le->link) { > rc = tipc_link_rcv(le->link, skb, &xmitq); > skb = NULL; > } > - spin_unlock_bh(&le->lock); > } > - tipc_node_read_unlock(n); > + spin_unlock_bh(&le->lock); > > /* Check/update node state before receiving */ > if (unlikely(skb)) { > @@ -1573,15 +1566,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct > netlink_callback *cb) > continue; > } > > - tipc_node_read_lock(node); > + tipc_node_write_lock(node); > err = __tipc_nl_add_node(&msg, node); > if (err) { > last_addr = node->addr; > - tipc_node_read_unlock(node); > + tipc_node_write_unlock(node); > goto out; > } > > - tipc_node_read_unlock(node); > + tipc_node_write_unlock(node); > } > done = 1; > out: > @@ -1612,7 +1605,7 @@ static struct tipc_node > *tipc_node_find_by_name(struct net *net, > *bearer_id = 0; > rcu_read_lock(); > list_for_each_entry_rcu(n, &tn->node_list, list) { > - tipc_node_read_lock(n); > + tipc_node_write_lock(n); > for (i = 0; i < MAX_BEARERS; i++) { > l = n->links[i].link; > if (l && !strcmp(tipc_link_name(l), link_name)) { > @@ -1621,7 +1614,7 @@ static struct tipc_node > *tipc_node_find_by_name(struct net *net, > break; > } > } > - tipc_node_read_unlock(n); > + tipc_node_write_unlock(n); > if (found_node) > break; > } > @@ -1662,8 +1655,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, > struct genl_info *info) > if (!node) > return -EINVAL; > > - tipc_node_read_lock(node); > - > + spin_lock_bh(&node->links[bearer_id].lock); > link = node->links[bearer_id].link; > if (!link) { > res = -EINVAL; > @@ -1701,7 +1693,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb, > struct genl_info *info) > } > > out: > - tipc_node_read_unlock(node); > + spin_unlock_bh(&node->links[bearer_id].lock); > > return res; > } > @@ -1738,17 +1730,16 @@ int tipc_nl_node_get_link(struct sk_buff *skb, > struct genl_info *info) > node = tipc_node_find_by_name(net, name, &bearer_id); > if (!node) > return -EINVAL; > - > - tipc_node_read_lock(node); > + spin_lock_bh(&node->links[bearer_id].lock); > link = node->links[bearer_id].link; > if (!link) { > - tipc_node_read_unlock(node); > + spin_unlock_bh(&node->links[bearer_id].lock); > nlmsg_free(msg.skb); > return -EINVAL; > } > > err = __tipc_nl_add_link(net, &msg, link, 0); > - tipc_node_read_unlock(node); > + spin_unlock_bh(&node->links[bearer_id].lock); > if (err) { > nlmsg_free(msg.skb); > return err; > @@ -1795,17 +1786,14 @@ int tipc_nl_node_reset_link_stats(struct sk_buff > *skb, struct genl_info *info) > return -EINVAL; > > le = &node->links[bearer_id]; > - tipc_node_read_lock(node); > spin_lock_bh(&le->lock); > link = node->links[bearer_id].link; > if (!link) { > spin_unlock_bh(&le->lock); > - tipc_node_read_unlock(node); > return -EINVAL; > } > tipc_link_reset_stats(link); > spin_unlock_bh(&le->lock); > - tipc_node_read_unlock(node); > return 0; > } > > @@ -1867,10 +1855,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, > struct netlink_callback *cb) > > list_for_each_entry_continue_rcu(node, &tn->node_list, > list) { > - tipc_node_read_lock(node); > + tipc_node_write_lock(node); > err = __tipc_nl_add_node_links(net, &msg, node, > &prev_link); > - tipc_node_read_unlock(node); > + tipc_node_write_unlock(node); > if (err) > goto out; > > @@ -1882,10 +1870,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb, > struct netlink_callback *cb) > goto out; > > list_for_each_entry_rcu(node, &tn->node_list, list) { > - tipc_node_read_lock(node); > + tipc_node_write_lock(node); > err = __tipc_nl_add_node_links(net, &msg, node, > &prev_link); > - tipc_node_read_unlock(node); > + tipc_node_write_unlock(node); > if (err) > goto out; > > -- > 2.1.4 > > > > _______________________________________________ > tipc-discussion mailing list > tipc-discussion@lists.sourceforge.net > https://lists.sourceforge.net/lists/listinfo/tipc-discussion > _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion