On 06/16/2016 02:18 AM, Xue, Ying wrote: > Only one suggestion: > > Please declare filter_rcv() as: > static bool filter_rcv(struct sock *sk, struct sk_buff *iskb, struct sk_buff > *oskb) > > In other words, it's better to declare the third parameter as skb pointer > rather than struct sk_buff_head *xmitq. This is because the number of skb > buffer generated by filter_rcv() is no more than 1. As a result, changes to > be made in tipc_backlog_rcv() will be much less.
I don't agree. I will save exactly one line of code in tipc_backlog_rcv(); since I don't have to dequeue the buffer. All the rest will remain the same. The cost of this is that I will have to zero out the buffer pointer in filter_rcv() and tipc_sk_proto_rcv(), plus add a test for whether or not there is a buffer to be added to xmitq in tipc_sk_enequeue(). In brief, we end up with more code, not less. Besides, this technique is consistent with how I do it in many other functions in the code, independent of how many buffers will potentially be returned, and is easy to recognize and understand. So if you don't have strong objections, I prefer to keep it this way. Regards ///jon > > Regards, > Ying > > -----Original Message----- > From: Jon Maloy [mailto:[email protected]] > Sent: Wednesday, June 15, 2016 10:46 PM > To: [email protected]; > [email protected]; Xue, Ying; [email protected]; > [email protected] > Cc: [email protected]; [email protected] > Subject: [PATCH net v5 1/1] tipc: fix socket timer deadlock > > We sometimes observe a 'deadly embrace' type deadlock occurring between > mutually connected sockets on the same node. This happens when the one-hour > peer supervision timers happen to expire simultaneously in both sockets. > > The scenario is as follows: > > CPU 1: CPU 2: > -------- -------- > tipc_sk_timeout(sk1) tipc_sk_timeout(sk2) > lock(sk1.slock) lock(sk2.slock) > msg_create(probe) msg_create(probe) > unlock(sk1.slock) unlock(sk2.slock) > tipc_node_xmit_skb() tipc_node_xmit_skb() > tipc_node_xmit() tipc_node_xmit() > tipc_sk_rcv(sk2) tipc_sk_rcv(sk1) > lock(sk2.slock) lock((sk1.slock) > filter_rcv() filter_rcv() > tipc_sk_proto_rcv() tipc_sk_proto_rcv() > msg_create(probe_rsp) msg_create(probe_rsp) > tipc_sk_respond() tipc_sk_respond() > tipc_node_xmit_skb() tipc_node_xmit_skb() > tipc_node_xmit() tipc_node_xmit() > tipc_sk_rcv(sk1) tipc_sk_rcv(sk2) > lock((sk1.slock) lock((sk2.slock) > ===> DEADLOCK ===> DEADLOCK > > Further analysis reveals that there are at least three different locations in > the socket code where tipc_sk_respond() is called within the context of the > socket lock, with ensuing risk of similar deadlocks. > > We now solve this by passing a buffer queue along with all upcalls where > sk_lock.slock may potentially be held. Response or rejected messages are > accumulated into this queue instead of being sent out directly, and sent out > once we know we are safely outside the sk_lock.slock context. > > v2: - Testing on mutex sk_lock.owned instead of sk_lock.slock in > tipc_sk_respond(). This is safer, since sk_lock.slock may > occasionally and briefly be held (by concurrent user contexts) > even if we are in user context. > > v3: - By lowering the socket timeout to 36 ms instead of 3,600,000 and > setting up 1000 connections I could easily reproduce the deadlock > and verify that my solution works. > - When killing one of the processes I sometimes got a kernel crash > in the loop emptying the socket write queue. Realizing that there > may be concurrent processes emptying the write queue, I had to add > a test that the dequeuing actually returned a buffer. This solved > the problem. > - I tried Ying's suggestion with unconditionally adding all > CONN_MANAGER messages to the backlog queue, and it didn't work. > This is because we will often add the message to the backlog when > the socket is *not* owned, so there will be nothing triggering > execution of backlog_rcv() within acceptable time. Apart from > that, my solution solves the problem at all three locations where > this deadlock may happen, as already stated above. > > v4: - Introduced separate queue in struct tipc_sock for the purpose > above, instead of using the socket send queue. The socket send > queue was used for regular message sending until commit > f214fc402967e ("tipc: Revert tipc: use existing sk_write_queue for > outgoing packet chain") i.e. as recent as kernel 4.5, so using > this queue would screw up older kernel versions. > - Made small cosmetic improvement to the dequeuing loop. > > v5: - Was convinced by Ying that not even v4 above is 100% safe. Now > (re-)introducing a solution that I believe is the only really safe > one. > > Reported-by: GUNA <[email protected]> > Signed-off-by: Jon Maloy <[email protected]> > --- > net/tipc/socket.c | 54 > ++++++++++++++++++++++++++++++++++++++++++------------ > 1 file changed, 42 insertions(+), 12 deletions(-) > > diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 88bfcd7..c49b8df > 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -796,9 +796,11 @@ void tipc_sk_mcast_rcv(struct net *net, struct > sk_buff_head *arrvq, > * @tsk: receiving socket > * @skb: pointer to message buffer. > */ > -static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb) > +static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, > + struct sk_buff_head *xmitq) > { > struct sock *sk = &tsk->sk; > + u32 onode = tsk_own_node(tsk); > struct tipc_msg *hdr = buf_msg(skb); > int mtyp = msg_type(hdr); > bool conn_cong; > @@ -811,7 +813,8 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, > struct sk_buff *skb) > > if (mtyp == CONN_PROBE) { > msg_set_type(hdr, CONN_PROBE_REPLY); > - tipc_sk_respond(sk, skb, TIPC_OK); > + if (tipc_msg_reverse(onode, &skb, TIPC_OK)) > + __skb_queue_tail(xmitq, skb); > return; > } else if (mtyp == CONN_ACK) { > conn_cong = tsk_conn_cong(tsk); > @@ -1686,7 +1689,8 @@ static unsigned int rcvbuf_limit(struct sock *sk, > struct sk_buff *skb) > * > * Returns true if message was added to socket receive queue, otherwise > false > */ > -static bool filter_rcv(struct sock *sk, struct sk_buff *skb) > +static bool filter_rcv(struct sock *sk, struct sk_buff *skb, > + struct sk_buff_head *xmitq) > { > struct socket *sock = sk->sk_socket; > struct tipc_sock *tsk = tipc_sk(sk); > @@ -1696,7 +1700,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff > *skb) > int usr = msg_user(hdr); > > if (unlikely(msg_user(hdr) == CONN_MANAGER)) { > - tipc_sk_proto_rcv(tsk, skb); > + tipc_sk_proto_rcv(tsk, skb, xmitq); > return false; > } > > @@ -1739,7 +1743,8 @@ static bool filter_rcv(struct sock *sk, struct sk_buff > *skb) > return true; > > reject: > - tipc_sk_respond(sk, skb, err); > + if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err)) > + __skb_queue_tail(xmitq, skb); > return false; > } > > @@ -1755,9 +1760,24 @@ reject: > static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) { > unsigned int truesize = skb->truesize; > + struct sk_buff_head xmitq; > + u32 dnode, selector; > > - if (likely(filter_rcv(sk, skb))) > + __skb_queue_head_init(&xmitq); > + > + if (likely(filter_rcv(sk, skb, &xmitq))) { > atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); > + return 0; > + } > + > + if (skb_queue_empty(&xmitq)) > + return 0; > + > + /* Send response/rejected message */ > + skb = __skb_dequeue(&xmitq); > + dnode = msg_destnode(buf_msg(skb)); > + selector = msg_origport(buf_msg(skb)); > + tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector); > return 0; > } > > @@ -1771,12 +1791,13 @@ static int tipc_backlog_rcv(struct sock *sk, struct > sk_buff *skb) > * Caller must hold socket lock > */ > static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk, > - u32 dport) > + u32 dport, struct sk_buff_head *xmitq) > { > + unsigned long time_limit = jiffies + 2; > + struct sk_buff *skb; > unsigned int lim; > atomic_t *dcnt; > - struct sk_buff *skb; > - unsigned long time_limit = jiffies + 2; > + u32 onode; > > while (skb_queue_len(inputq)) { > if (unlikely(time_after_eq(jiffies, time_limit))) @@ -1788,7 > +1809,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct > sock *sk, > > /* Add message directly to receive queue if possible */ > if (!sock_owned_by_user(sk)) { > - filter_rcv(sk, skb); > + filter_rcv(sk, skb, xmitq); > continue; > } > > @@ -1801,7 +1822,9 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, > continue; > > /* Overload => reject message back to sender */ > - tipc_sk_respond(sk, skb, TIPC_ERR_OVERLOAD); > + onode = tipc_own_addr(sock_net(sk)); > + if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) > + __skb_queue_tail(xmitq, skb); > break; > } > } > @@ -1814,12 +1837,14 @@ static void tipc_sk_enqueue(struct sk_buff_head > *inputq, struct sock *sk, > */ > void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq) { > + struct sk_buff_head xmitq; > u32 dnode, dport = 0; > int err; > struct tipc_sock *tsk; > struct sock *sk; > struct sk_buff *skb; > > + __skb_queue_head_init(&xmitq); > while (skb_queue_len(inputq)) { > dport = tipc_skb_peek_port(inputq, dport); > tsk = tipc_sk_lookup(net, dport); > @@ -1827,9 +1852,14 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head > *inputq) > if (likely(tsk)) { > sk = &tsk->sk; > if (likely(spin_trylock_bh(&sk->sk_lock.slock))) { > - tipc_sk_enqueue(inputq, sk, dport); > + tipc_sk_enqueue(inputq, sk, dport, &xmitq); > spin_unlock_bh(&sk->sk_lock.slock); > } > + /* Send pending response/rejected messages, if any */ > + while ((skb = __skb_dequeue(&xmitq))) { > + dnode = msg_destnode(buf_msg(skb)); > + tipc_node_xmit_skb(net, skb, dnode, dport); > + } > sock_put(sk); > continue; > } > -- > 1.9.1 > ------------------------------------------------------------------------------ What NetFlow Analyzer can do for you? Monitors network bandwidth and traffic patterns at an interface-level. Reveals which users, apps, and protocols are consuming the most bandwidth. Provides multi-vendor support for NetFlow, J-Flow, sFlow and other flows. Make informed decisions using capacity planning reports. http://pubads.g.doubleclick.net/gampad/clk?id=1444514421&iu=/41014381 _______________________________________________ tipc-discussion mailing list [email protected] https://lists.sourceforge.net/lists/listinfo/tipc-discussion
