Hi. I remember this 😊

The node struct size could potentially be a problem on preempt kernels is
if we need to do synchronize_rcu in a hot path (tipc_rcv).
But i dont think thats necessary, and that the RCU approach is preferrable.

//E


On Tue, 26 Mar 2019, 22:03 Jon Maloy, <jon.ma...@ericsson.com> wrote:

> Hi Hoang,
> This is what I did back in 2015. It worked fully with hundreds of failover
> tests, and I was ready to deliver it.
> Ying didn't like it though, and since it didn't give any visible
> performance improvements, I abandoned it  for the time being.
>
> Back then I believed we could replace the rw lock with an rcu lock, as
> even suggested by David Miller, but I still cannot see how that would be
> possible.
> RCU locks require that a copy of the item in question is updated, and then
> linked in at an atomic operation at the end of the grace period.
> The node object is too big to just copy, update and re-link in, in my
> view, but It might be possible that there is still something I don't
> understand regarding this.
>
> I suggest we try with this again, and then ask for feedback from the
> community. Either they must convince us that this is a bad approach, or
> that it is possible to do by using RCU locks instead.
>
> Note that the code has changed a bit since this was developed, so the
> patch won't apply, but it should give you an idea about how to do it.
>
> ///jon
>
>
> -----Original Message-----
> From: Jon Maloy
> Sent: 26-Mar-19 15:14
> To: Jon Maloy <jon.ma...@ericsson.com>; Jon Maloy <ma...@donjonn.com>
> Cc: Tung Quang Nguyen <tung.q.ngu...@dektech.com.au>; Hoang Huu Le <
> hoang.h...@dektech.com.au>; tuong.l.l...@dektech.com.au;
> tipc-...@dektech.com.au
> Subject: [PATCH 1/1] tipc: eliminate node rwlock
>
> Thorough analysis of the code has shown that the node rwlock is not
> strictly needed. Instead, it can be replaced with proper usage of the link
> locks.
>
> The read mode of this lock is almost always followed by grabbing a link
> lock, because it is used for either sending, receiving or running a timeout
> handler on a particular link.
>
> The write mode means that we are either changing the working state of one
> or more links, or that we are redistributing the link slots in the node
> struct itself. These actions can be protected by grabbing the link lock of
> all the involved links, instead of using any separate lock for this.
>
> We now change the node locking policy as follows:
>
> - We grab a "write mode" lock for the node by just taking all (currently
>   three) link spinlocks pertaining to that node.
>
> - We "release" the write mode lock by releasing all the spinlock again,
>   in the opposite order.
>
> - As an effect of the above, grabbing any one of the link locks not
>   only protects the link in question, but now effectively serves as
>   a "read mode" lock for the whole node structure.
>
> Since the ratio read_mode/write_mode is extremely low, we can safely make
> this change without worrying about the extra cost of grabbing multiple
> spinlocks.
>
> In a few locations, where the current read mode lock is not followed by
> grabbing a link lock, we change the locking into "write mode" as described
> above. This only affects a couple of management calls, and will have no
> impact on overall performance.
>
> Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
> ---
>  net/tipc/node.c | 94
> +++++++++++++++++++++++++--------------------------------
>  1 file changed, 41 insertions(+), 53 deletions(-)
>
> diff --git a/net/tipc/node.c b/net/tipc/node.c index 3f7a4ed..d3b01f6
> 100644
> --- a/net/tipc/node.c
> +++ b/net/tipc/node.c
> @@ -264,19 +264,12 @@ static struct tipc_node *tipc_node_find(struct net
> *net, u32 addr)
>         return NULL;
>  }
>
> -static void tipc_node_read_lock(struct tipc_node *n) -{
> -       read_lock_bh(&n->lock);
> -}
> -
> -static void tipc_node_read_unlock(struct tipc_node *n) -{
> -       read_unlock_bh(&n->lock);
> -}
> -
>  static void tipc_node_write_lock(struct tipc_node *n)  {
> -       write_lock_bh(&n->lock);
> +       int i = 0;
> +
> +       for (; i < MAX_BEARERS; i++)
> +               spin_lock_bh(&n->links[i].lock);
>  }
>
>  static void tipc_node_write_unlock(struct tipc_node *n) @@ -285,13 +278,9
> @@ static void tipc_node_write_unlock(struct tipc_node *n)
>         u32 addr = 0;
>         u32 flags = n->action_flags;
>         u32 link_id = 0;
> +       int i = 0;
>         struct list_head *publ_list;
>
> -       if (likely(!flags)) {
> -               write_unlock_bh(&n->lock);
> -               return;
> -       }
> -
>         addr = n->addr;
>         link_id = n->link_id;
>         publ_list = &n->publ_list;
> @@ -299,7 +288,11 @@ static void tipc_node_write_unlock(struct tipc_node
> *n)
>         n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
>                              TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
>
> -       write_unlock_bh(&n->lock);
> +       for (i = MAX_BEARERS - 1; i >= 0; i--)
> +               spin_unlock_bh(&n->links[i].lock);
> +
> +       if (!flags)
> +               return;
>
>         if (flags & TIPC_NOTIFY_NODE_DOWN)
>                 tipc_publ_notify(net, publ_list, addr); @@ -516,7 +509,6
> @@ static void tipc_node_timeout(unsigned long data)
>         __skb_queue_head_init(&xmitq);
>
>         for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
> -               tipc_node_read_lock(n);
>                 le = &n->links[bearer_id];
>                 spin_lock_bh(&le->lock);
>                 if (le->link) {
> @@ -525,7 +517,6 @@ static void tipc_node_timeout(unsigned long data)
>                         rc = tipc_link_timeout(le->link, &xmitq);
>                 }
>                 spin_unlock_bh(&le->lock);
> -               tipc_node_read_unlock(n);
>                 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
>                 if (rc & TIPC_LINK_DOWN_EVT)
>                         tipc_node_link_down(n, bearer_id, false); @@
> -1113,14 +1104,14 @@ int tipc_node_get_linkname(struct net *net, u32
> bearer_id, u32 addr,
>         if (bearer_id >= MAX_BEARERS)
>                 goto exit;
>
> -       tipc_node_read_lock(node);
> +       spin_lock_bh(&node->links[bearer_id].lock);
>         link = node->links[bearer_id].link;
>         if (link) {
>                 strncpy(linkname, tipc_link_name(link), len);
>                 err = 0;
>         }
>  exit:
> -       tipc_node_read_unlock(node);
> +       spin_unlock_bh(&node->links[bearer_id].lock);
>         tipc_node_put(node);
>         return err;
>  }
> @@ -1174,21 +1165,25 @@ int tipc_node_xmit(struct net *net, struct
> sk_buff_head *list,
>         struct tipc_link_entry *le = NULL;
>         struct tipc_node *n;
>         struct sk_buff_head xmitq;
> -       int bearer_id = -1;
> +       int bearer_id = -1, prel_id;
> +       int slc = selector & 1;
>         int rc = -EHOSTUNREACH;
>
>         __skb_queue_head_init(&xmitq);
>         n = tipc_node_find(net, dnode);
>         if (likely(n)) {
> -               tipc_node_read_lock(n);
> -               bearer_id = n->active_links[selector & 1];
> -               if (bearer_id >= 0) {
> -                       le = &n->links[bearer_id];
> +again:
> +               prel_id = n->active_links[slc];
> +               if (prel_id >= 0) {
> +                       le = &n->links[prel_id];
>                         spin_lock_bh(&le->lock);
> -                       rc = tipc_link_xmit(le->link, list, &xmitq);
> +                       bearer_id = n->active_links[slc];
> +                       if (bearer_id == prel_id)
> +                               rc = tipc_link_xmit(le->link, list,
> &xmitq);
>                         spin_unlock_bh(&le->lock);
> +                       if (unlikely(bearer_id != prel_id))
> +                               goto again;
>                 }
> -               tipc_node_read_unlock(n);
>                 if (likely(!skb_queue_empty(&xmitq))) {
>                         tipc_bearer_xmit(net, bearer_id, &xmitq,
> &le->maddr);
>                         return 0;
> @@ -1292,9 +1287,9 @@ static void tipc_node_bc_rcv(struct net *net, struct
> sk_buff *skb, int bearer_id
>
>         /* Broadcast ACKs are sent on a unicast link */
>         if (rc & TIPC_LINK_SND_BC_ACK) {
> -               tipc_node_read_lock(n);
> +               spin_lock_bh(&le->lock);
>                 tipc_link_build_ack_msg(le->link, &xmitq);
> -               tipc_node_read_unlock(n);
> +               spin_unlock_bh(&le->lock);
>         }
>
>         if (!skb_queue_empty(&xmitq))
> @@ -1487,16 +1482,14 @@ void tipc_rcv(struct net *net, struct sk_buff
> *skb, struct tipc_bearer *b)
>                 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
>
>         /* Receive packet directly if conditions permit */
> -       tipc_node_read_lock(n);
> +       spin_lock_bh(&le->lock);
>         if (likely((n->state == SELF_UP_PEER_UP) && (usr !=
> TUNNEL_PROTOCOL))) {
> -               spin_lock_bh(&le->lock);
>                 if (le->link) {
>                         rc = tipc_link_rcv(le->link, skb, &xmitq);
>                         skb = NULL;
>                 }
> -               spin_unlock_bh(&le->lock);
>         }
> -       tipc_node_read_unlock(n);
> +       spin_unlock_bh(&le->lock);
>
>         /* Check/update node state before receiving */
>         if (unlikely(skb)) {
> @@ -1573,15 +1566,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct
> netlink_callback *cb)
>                                 continue;
>                 }
>
> -               tipc_node_read_lock(node);
> +               tipc_node_write_lock(node);
>                 err = __tipc_nl_add_node(&msg, node);
>                 if (err) {
>                         last_addr = node->addr;
> -                       tipc_node_read_unlock(node);
> +                       tipc_node_write_unlock(node);
>                         goto out;
>                 }
>
> -               tipc_node_read_unlock(node);
> +               tipc_node_write_unlock(node);
>         }
>         done = 1;
>  out:
> @@ -1612,7 +1605,7 @@ static struct tipc_node
> *tipc_node_find_by_name(struct net *net,
>         *bearer_id = 0;
>         rcu_read_lock();
>         list_for_each_entry_rcu(n, &tn->node_list, list) {
> -               tipc_node_read_lock(n);
> +               tipc_node_write_lock(n);
>                 for (i = 0; i < MAX_BEARERS; i++) {
>                         l = n->links[i].link;
>                         if (l && !strcmp(tipc_link_name(l), link_name)) {
> @@ -1621,7 +1614,7 @@ static struct tipc_node
> *tipc_node_find_by_name(struct net *net,
>                                 break;
>                         }
>                 }
> -               tipc_node_read_unlock(n);
> +               tipc_node_write_unlock(n);
>                 if (found_node)
>                         break;
>         }
> @@ -1662,8 +1655,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb,
> struct genl_info *info)
>         if (!node)
>                 return -EINVAL;
>
> -       tipc_node_read_lock(node);
> -
> +       spin_lock_bh(&node->links[bearer_id].lock);
>         link = node->links[bearer_id].link;
>         if (!link) {
>                 res = -EINVAL;
> @@ -1701,7 +1693,7 @@ int tipc_nl_node_set_link(struct sk_buff *skb,
> struct genl_info *info)
>         }
>
>  out:
> -       tipc_node_read_unlock(node);
> +       spin_unlock_bh(&node->links[bearer_id].lock);
>
>         return res;
>  }
> @@ -1738,17 +1730,16 @@ int tipc_nl_node_get_link(struct sk_buff *skb,
> struct genl_info *info)
>                 node = tipc_node_find_by_name(net, name, &bearer_id);
>                 if (!node)
>                         return -EINVAL;
> -
> -               tipc_node_read_lock(node);
> +               spin_lock_bh(&node->links[bearer_id].lock);
>                 link = node->links[bearer_id].link;
>                 if (!link) {
> -                       tipc_node_read_unlock(node);
> +                       spin_unlock_bh(&node->links[bearer_id].lock);
>                         nlmsg_free(msg.skb);
>                         return -EINVAL;
>                 }
>
>                 err = __tipc_nl_add_link(net, &msg, link, 0);
> -               tipc_node_read_unlock(node);
> +               spin_unlock_bh(&node->links[bearer_id].lock);
>                 if (err) {
>                         nlmsg_free(msg.skb);
>                         return err;
> @@ -1795,17 +1786,14 @@ int tipc_nl_node_reset_link_stats(struct sk_buff
> *skb, struct genl_info *info)
>                 return -EINVAL;
>
>         le = &node->links[bearer_id];
> -       tipc_node_read_lock(node);
>         spin_lock_bh(&le->lock);
>         link = node->links[bearer_id].link;
>         if (!link) {
>                 spin_unlock_bh(&le->lock);
> -               tipc_node_read_unlock(node);
>                 return -EINVAL;
>         }
>         tipc_link_reset_stats(link);
>         spin_unlock_bh(&le->lock);
> -       tipc_node_read_unlock(node);
>         return 0;
>  }
>
> @@ -1867,10 +1855,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb,
> struct netlink_callback *cb)
>
>                 list_for_each_entry_continue_rcu(node, &tn->node_list,
>                                                  list) {
> -                       tipc_node_read_lock(node);
> +                       tipc_node_write_lock(node);
>                         err = __tipc_nl_add_node_links(net, &msg, node,
>                                                        &prev_link);
> -                       tipc_node_read_unlock(node);
> +                       tipc_node_write_unlock(node);
>                         if (err)
>                                 goto out;
>
> @@ -1882,10 +1870,10 @@ int tipc_nl_node_dump_link(struct sk_buff *skb,
> struct netlink_callback *cb)
>                         goto out;
>
>                 list_for_each_entry_rcu(node, &tn->node_list, list) {
> -                       tipc_node_read_lock(node);
> +                       tipc_node_write_lock(node);
>                         err = __tipc_nl_add_node_links(net, &msg, node,
>                                                        &prev_link);
> -                       tipc_node_read_unlock(node);
> +                       tipc_node_write_unlock(node);
>                         if (err)
>                                 goto out;
>
> --
> 2.1.4
>
>
>
> _______________________________________________
> tipc-discussion mailing list
> tipc-discussion@lists.sourceforge.net
> https://lists.sourceforge.net/lists/listinfo/tipc-discussion
>

_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to