Thanks for your explanation.

Please add my "acked-by".

Regards,
Ying

On 06/16/2016 02:18 AM, Xue, Ying wrote:
> Only one suggestion:
>
> Please declare filter_rcv() as:
> static bool filter_rcv(struct sock *sk, struct sk_buff *iskb, struct 
> sk_buff *oskb)
>
> In other words, it's better to declare the third parameter as skb pointer 
> rather than struct sk_buff_head *xmitq. This is because the number of skb 
> buffer generated by filter_rcv() is no more than 1.  As a result, changes to 
> be made in tipc_backlog_rcv() will be much less.

I don't agree. I will save exactly one line of code in tipc_backlog_rcv(); 
since I don't have to dequeue the buffer. All the rest will remain the same.
The cost of this is that I will have to zero out the buffer pointer in
filter_rcv() and tipc_sk_proto_rcv(), plus add a test for whether or not there 
is a buffer to be added to xmitq in tipc_sk_enequeue(). In brief, we end up 
with more code, not less.
Besides, this technique is consistent with how I do it in many other functions 
in the code, independent of how many buffers will potentially be returned, and 
is easy to recognize and understand.

So if you don't have strong objections, I prefer to keep it this way.

Regards
///jon

>
> Regards,
> Ying
>
> -----Original Message-----
> From: Jon Maloy [mailto:[email protected]]
> Sent: Wednesday, June 15, 2016 10:46 PM
> To: [email protected]; 
> [email protected]; Xue, Ying; 
> [email protected]; [email protected]
> Cc: [email protected]; [email protected]
> Subject: [PATCH net v5 1/1] tipc: fix socket timer deadlock
>
> We sometimes observe a 'deadly embrace' type deadlock occurring between 
> mutually connected sockets on the same node. This happens when the one-hour 
> peer supervision timers happen to expire simultaneously in both sockets.
>
> The scenario is as follows:
>
> CPU 1:                          CPU 2:
> --------                        --------
> tipc_sk_timeout(sk1)            tipc_sk_timeout(sk2)
>    lock(sk1.slock)                 lock(sk2.slock)
>    msg_create(probe)               msg_create(probe)
>    unlock(sk1.slock)               unlock(sk2.slock)
>    tipc_node_xmit_skb()            tipc_node_xmit_skb()
>      tipc_node_xmit()                tipc_node_xmit()
>        tipc_sk_rcv(sk2)                tipc_sk_rcv(sk1)
>          lock(sk2.slock)                 lock((sk1.slock)
>          filter_rcv()                    filter_rcv()
>            tipc_sk_proto_rcv()             tipc_sk_proto_rcv()
>              msg_create(probe_rsp)           msg_create(probe_rsp)
>              tipc_sk_respond()               tipc_sk_respond()
>                tipc_node_xmit_skb()            tipc_node_xmit_skb()
>                  tipc_node_xmit()                tipc_node_xmit()
>                    tipc_sk_rcv(sk1)                tipc_sk_rcv(sk2)
>                      lock((sk1.slock)                lock((sk2.slock)
>                      ===> DEADLOCK                   ===> DEADLOCK
>
> Further analysis reveals that there are at least three different locations in 
> the socket code where tipc_sk_respond() is called within the context of the 
> socket lock, with ensuing risk of similar deadlocks.
>
> We now solve this by passing a buffer queue along with all upcalls where 
> sk_lock.slock may potentially be held. Response or rejected messages are 
> accumulated into this queue instead of being sent out directly, and sent out 
> once we know we are safely outside the sk_lock.slock context.
>
> v2: - Testing on mutex sk_lock.owned instead of sk_lock.slock in
>        tipc_sk_respond(). This is safer, since sk_lock.slock may
>        occasionally and briefly be held (by concurrent user contexts)
>        even if we are in user context.
>
> v3: - By lowering the socket timeout to 36 ms instead of 3,600,000 and
>        setting up 1000 connections I could easily reproduce the deadlock
>        and verify that my solution works.
>      - When killing one of the processes I sometimes got a kernel crash
>        in the loop emptying the socket write queue. Realizing that there
>        may be concurrent processes emptying the write queue, I had to add
>        a test that the dequeuing actually returned a buffer. This solved
>        the problem.
>      - I tried Ying's suggestion with unconditionally adding all
>        CONN_MANAGER messages to the backlog queue, and it didn't work.
>        This is because we will often add the message to the backlog when
>        the socket is *not* owned, so there will be nothing triggering
>        execution of backlog_rcv() within acceptable time. Apart from
>        that, my solution solves the problem at all three locations where
>        this deadlock may happen, as already stated above.
>
> v4: - Introduced separate queue in struct tipc_sock for the purpose
>        above, instead of using the socket send queue. The socket send
>        queue was used for regular message sending until commit
>        f214fc402967e ("tipc: Revert tipc: use existing sk_write_queue for
>        outgoing packet chain") i.e. as recent as kernel 4.5, so using
>        this queue would screw up older kernel versions.
>      - Made small cosmetic improvement to the dequeuing loop.
>
> v5: - Was convinced by Ying that not even v4 above is 100% safe. Now
>        (re-)introducing a solution that I believe is the only really safe
>        one.
>
> Reported-by: GUNA <[email protected]>
> Signed-off-by: Jon Maloy <[email protected]>
> ---
>   net/tipc/socket.c | 54 
> ++++++++++++++++++++++++++++++++++++++++++------------
>   1 file changed, 42 insertions(+), 12 deletions(-)
>
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 
> 88bfcd7..c49b8df 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -796,9 +796,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct 
> sk_buff_head *arrvq,
>    * @tsk: receiving socket
>    * @skb: pointer to message buffer.
>    */
> -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff 
> *skb)
> +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
> +                           struct sk_buff_head *xmitq)
>   {
>       struct sock *sk = &tsk->sk;
> +     u32 onode = tsk_own_node(tsk);
>       struct tipc_msg *hdr = buf_msg(skb);
>       int mtyp = msg_type(hdr);
>       bool conn_cong;
> @@ -811,7 +813,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock 
> *tsk, struct sk_buff *skb)
>   
>       if (mtyp == CONN_PROBE) {
>               msg_set_type(hdr, CONN_PROBE_REPLY);
> -             tipc_sk_respond(sk, skb, TIPC_OK);
> +             if (tipc_msg_reverse(onode, &skb, TIPC_OK))
> +                     __skb_queue_tail(xmitq, skb);
>               return;
>       } else if (mtyp == CONN_ACK) {
>               conn_cong = tsk_conn_cong(tsk);
> @@ -1686,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, 
> struct sk_buff *skb)
>    *
>    * Returns true if message was added to socket receive queue, otherwise 
> false
>    */
> -static bool filter_rcv(struct sock *sk, struct sk_buff *skb)
> +static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
> +                    struct sk_buff_head *xmitq)
>   {
>       struct socket *sock = sk->sk_socket;
>       struct tipc_sock *tsk = tipc_sk(sk); @@ -1696,7 +1700,7 @@ static 
> bool filter_rcv(struct sock *sk, struct sk_buff *skb)
>       int usr = msg_user(hdr);
>   
>       if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
> -             tipc_sk_proto_rcv(tsk, skb);
> +             tipc_sk_proto_rcv(tsk, skb, xmitq);
>               return false;
>       }
>   
> @@ -1739,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff 
> *skb)
>       return true;
>   
>   reject:
> -     tipc_sk_respond(sk, skb, err);
> +     if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
> +             __skb_queue_tail(xmitq, skb);
>       return false;
>   }
>   
> @@ -1755,9 +1760,24 @@ reject:
>   static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)  {
>       unsigned int truesize = skb->truesize;
> +     struct sk_buff_head xmitq;
> +     u32 dnode, selector;
>   
> -     if (likely(filter_rcv(sk, skb)))
> +     __skb_queue_head_init(&xmitq);
> +
> +     if (likely(filter_rcv(sk, skb, &xmitq))) {
>               atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
> +             return 0;
> +     }
> +
> +     if (skb_queue_empty(&xmitq))
> +             return 0;
> +
> +     /* Send response/rejected message */
> +     skb = __skb_dequeue(&xmitq);
> +     dnode = msg_destnode(buf_msg(skb));
> +     selector = msg_origport(buf_msg(skb));
> +     tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
>       return 0;
>   }
>   
> @@ -1771,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct 
> sk_buff *skb)
>    * Caller must hold socket lock
>    */
>   static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
> -                         u32 dport)
> +                         u32 dport, struct sk_buff_head *xmitq)
>   {
> +     unsigned long time_limit = jiffies + 2;
> +     struct sk_buff *skb;
>       unsigned int lim;
>       atomic_t *dcnt;
> -     struct sk_buff *skb;
> -     unsigned long time_limit = jiffies + 2;
> +     u32 onode;
>   
>       while (skb_queue_len(inputq)) {
>               if (unlikely(time_after_eq(jiffies, time_limit))) @@ -1788,7 
> +1809,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, 
> struct sock *sk,
>   
>               /* Add message directly to receive queue if possible */
>               if (!sock_owned_by_user(sk)) {
> -                     filter_rcv(sk, skb);
> +                     filter_rcv(sk, skb, xmitq);
>                       continue;
>               }
>   
> @@ -1801,7 +1822,9 @@ static void tipc_sk_enqueue(struct sk_buff_head 
> *inputq, struct sock *sk,
>                       continue;
>   
>               /* Overload => reject message back to sender */
> -             tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD);
> +             onode = tipc_own_addr(sock_net(sk));
> +             if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
> +                     __skb_queue_tail(xmitq, skb);
>               break;
>       }
>   }
> @@ -1814,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head 
> *inputq, struct sock *sk,
>    */
>   void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)  {
> +     struct sk_buff_head xmitq;
>       u32 dnode, dport = 0;
>       int err;
>       struct tipc_sock *tsk;
>       struct sock *sk;
>       struct sk_buff *skb;
>   
> +     __skb_queue_head_init(&xmitq);
>       while (skb_queue_len(inputq)) {
>               dport = tipc_skb_peek_port(inputq, dport);
>               tsk = tipc_sk_lookup(net, dport);
> @@ -1827,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head 
> *inputq)
>               if (likely(tsk)) {
>                       sk = &tsk->sk;
>                       if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
> -                             tipc_sk_enqueue(inputq, sk, dport);
> +                             tipc_sk_enqueue(inputq, sk, dport, &xmitq);
>                               spin_unlock_bh(&sk->sk_lock.slock);
>                       }
> +                     /* Send pending response/rejected messages, if any */
> +                     while ((skb = __skb_dequeue(&xmitq))) {
> +                             dnode = msg_destnode(buf_msg(skb));
> +                             tipc_node_xmit_skb(net, skb, dnode, dport);
> +                     }
>                       sock_put(sk);
>                       continue;
>               }
> --
> 1.9.1
>


------------------------------------------------------------------------------
What NetFlow Analyzer can do for you? Monitors network bandwidth and traffic
patterns at an interface-level. Reveals which users, apps, and protocols are 
consuming the most bandwidth. Provides multi-vendor support for NetFlow, 
J-Flow, sFlow and other flows. Make informed decisions using capacity planning
reports. http://sdm.link/zohomanageengine
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to