We sometimes observe a 'deadly embrace' type deadlock occurring
between mutually connected sockets on the same node. This happens
when the one-hour peer supervision timers happen to expire
simultaneously in both sockets.

The scenario is as follows:

CPU 1:                          CPU 2:
--------                        --------
tipc_sk_timeout(sk1)            tipc_sk_timeout(sk2)
  lock(sk1.slock)                 lock(sk2.slock)
  msg_create(probe)               msg_create(probe)
  unlock(sk1.slock)               unlock(sk2.slock)
  tipc_node_xmit_skb()            tipc_node_xmit_skb()
    tipc_node_xmit()                tipc_node_xmit()
      tipc_sk_rcv(sk2)                tipc_sk_rcv(sk1)
        lock(sk2.slock)                 lock((sk1.slock)
        filter_rcv()                    filter_rcv()
          tipc_sk_proto_rcv()             tipc_sk_proto_rcv()
            msg_create(probe_rsp)           msg_create(probe_rsp)
            tipc_sk_respond()               tipc_sk_respond()
              tipc_node_xmit_skb()            tipc_node_xmit_skb()
                tipc_node_xmit()                tipc_node_xmit()
                  tipc_sk_rcv(sk1)                tipc_sk_rcv(sk2)
                    lock((sk1.slock)                lock((sk2.slock)
                    ===> DEADLOCK                   ===> DEADLOCK

Further analysis reveals that there are at least three different
locations in the socket code where tipc_sk_respond() is called within
the context of the socket lock, with ensuing risk of similar deadlocks.

We now solve this by passing a buffer queue along with all upcalls where
sk_lock.slock may potentially be held. Response or rejected messages are
accumulated into this queue instead of being sent out directly, and sent
out once we know we are safely outside the sk_lock.slock context.

v2: - Testing on mutex sk_lock.owned instead of sk_lock.slock in
      tipc_sk_respond(). This is safer, since sk_lock.slock may
      occasionally and briefly be held (by concurrent user contexts)
      even if we are in user context.

v3: - By lowering the socket timeout to 36 ms instead of 3,600,000 and
      setting up 1000 connections I could easily reproduce the deadlock
      and verify that my solution works.
    - When killing one of the processes I sometimes got a kernel crash
      in the loop emptying the socket write queue. Realizing that there
      may be concurrent processes emptying the write queue, I had to add
      a test that the dequeuing actually returned a buffer. This solved
      the problem.
    - I tried Ying's suggestion with unconditionally adding all
      CONN_MANAGER messages to the backlog queue, and it didn't work.
      This is because we will often add the message to the backlog when
      the socket is *not* owned, so there will be nothing triggering
      execution of backlog_rcv() within acceptable time. Apart from
      that, my solution solves the problem at all three locations where
      this deadlock may happen, as already stated above.

v4: - Introduced separate queue in struct tipc_sock for the purpose
      above, instead of using the socket send queue. The socket send
      queue was used for regular message sending until commit
      f214fc402967e ("tipc: Revert tipc: use existing sk_write_queue for
      outgoing packet chain") i.e. as recent as kernel 4.5, so using
      this queue would screw up older kernel versions.
    - Made small cosmetic improvement to the dequeuing loop.

v5: - Was convinced by Ying that not even v4 above is 100% safe. Now
      (re-)introducing a solution that I believe is the only really safe
      one.

Reported-by: GUNA <[email protected]>
Signed-off-by: Jon Maloy <[email protected]>
---
 net/tipc/socket.c | 54 ++++++++++++++++++++++++++++++++++++++++++------------
 1 file changed, 42 insertions(+), 12 deletions(-)

diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 88bfcd7..c49b8df 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -796,9 +796,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct 
sk_buff_head *arrvq,
  * @tsk: receiving socket
  * @skb: pointer to message buffer.
  */
-static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
+static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
+                             struct sk_buff_head *xmitq)
 {
        struct sock *sk = &tsk->sk;
+       u32 onode = tsk_own_node(tsk);
        struct tipc_msg *hdr = buf_msg(skb);
        int mtyp = msg_type(hdr);
        bool conn_cong;
@@ -811,7 +813,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct 
sk_buff *skb)
 
        if (mtyp == CONN_PROBE) {
                msg_set_type(hdr, CONN_PROBE_REPLY);
-               tipc_sk_respond(sk, skb, TIPC_OK);
+               if (tipc_msg_reverse(onode, &skb, TIPC_OK))
+                       __skb_queue_tail(xmitq, skb);
                return;
        } else if (mtyp == CONN_ACK) {
                conn_cong = tsk_conn_cong(tsk);
@@ -1686,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct 
sk_buff *skb)
  *
  * Returns true if message was added to socket receive queue, otherwise false
  */
-static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
+static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
+                      struct sk_buff_head *xmitq)
 {
        struct socket *sock = sk->sk_socket;
        struct tipc_sock *tsk = tipc_sk(sk);
@@ -1696,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff 
*skb)
        int usr = msg_user(hdr);
 
        if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
-               tipc_sk_proto_rcv(tsk, skb);
+               tipc_sk_proto_rcv(tsk, skb, xmitq);
                return false;
        }
 
@@ -1739,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff 
*skb)
        return true;
 
 reject:
-       tipc_sk_respond(sk, skb, err);
+       if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
+               __skb_queue_tail(xmitq, skb);
        return false;
 }
 
@@ -1755,9 +1760,24 @@ reject:
 static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
 {
        unsigned int truesize = skb->truesize;
+       struct sk_buff_head xmitq;
+       u32 dnode, selector;
 
-       if (likely(filter_rcv(sk, skb)))
+       __skb_queue_head_init(&xmitq);
+
+       if (likely(filter_rcv(sk, skb, &xmitq))) {
                atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
+               return 0;
+       }
+
+       if (skb_queue_empty(&xmitq))
+               return 0;
+
+       /* Send response/rejected message */
+       skb = __skb_dequeue(&xmitq);
+       dnode = msg_destnode(buf_msg(skb));
+       selector = msg_origport(buf_msg(skb));
+       tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
        return 0;
 }
 
@@ -1771,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct 
sk_buff *skb)
  * Caller must hold socket lock
  */
 static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
-                           u32 dport)
+                           u32 dport, struct sk_buff_head *xmitq)
 {
+       unsigned long time_limit = jiffies + 2;
+       struct sk_buff *skb;
        unsigned int lim;
        atomic_t *dcnt;
-       struct sk_buff *skb;
-       unsigned long time_limit = jiffies + 2;
+       u32 onode;
 
        while (skb_queue_len(inputq)) {
                if (unlikely(time_after_eq(jiffies, time_limit)))
@@ -1788,7 +1809,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, 
struct sock *sk,
 
                /* Add message directly to receive queue if possible */
                if (!sock_owned_by_user(sk)) {
-                       filter_rcv(sk, skb);
+                       filter_rcv(sk, skb, xmitq);
                        continue;
                }
 
@@ -1801,7 +1822,9 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, 
struct sock *sk,
                        continue;
 
                /* Overload => reject message back to sender */
-               tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
+               onode = tipc_own_addr(sock_net(sk));
+               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+                       __skb_queue_tail(xmitq, skb);
                break;
        }
 }
@@ -1814,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head 
*inputq, struct sock *sk,
  */
 void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
 {
+       struct sk_buff_head xmitq;
        u32 dnode, dport = 0;
        int err;
        struct tipc_sock *tsk;
        struct sock *sk;
        struct sk_buff *skb;
 
+       __skb_queue_head_init(&xmitq);
        while (skb_queue_len(inputq)) {
                dport = tipc_skb_peek_port(inputq, dport);
                tsk = tipc_sk_lookup(net, dport);
@@ -1827,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head 
*inputq)
                if (likely(tsk)) {
                        sk = &tsk->sk;
                        if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
-                               tipc_sk_enqueue(inputq, sk, dport);
+                               tipc_sk_enqueue(inputq, sk, dport, &xmitq);
                                spin_unlock_bh(&sk->sk_lock.slock);
                        }
+                       /* Send pending response/rejected messages, if any */
+                       while ((skb = __skb_dequeue(&xmitq))) {
+                               dnode = msg_destnode(buf_msg(skb));
+                               tipc_node_xmit_skb(net, skb, dnode, dport);
+                       }
                        sock_put(sk);
                        continue;
                }
-- 
1.9.1


------------------------------------------------------------------------------
What NetFlow Analyzer can do for you? Monitors network bandwidth and traffic
patterns at an interface-level. Reveals which users, apps, and protocols are 
consuming the most bandwidth. Provides multi-vendor support for NetFlow, 
J-Flow, sFlow and other flows. Make informed decisions using capacity planning
reports. http://pubads.g.doubleclick.net/gampad/clk?id=1444514421&iu=/41014381
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to