As a preparation for introduction of GSO we we need to eliminate the assumption that there is a one-to-one relation between queued/sent sk_buffs and ditto sequence numbered packets.
A GSO prepared buffer may in the future represent many message fragments, each of which having their own packet sequence number. We therefore prepare message buffers of type FIRST_FRAGMENT so that they may contain a complete message, comprising all its potential fragments, -not only the first one. We do this by adding a "packet count" field and a corresponding "last_seqno()" function to the header of such messages, and adapt the link transmission and reception code to handle them correctly. Note that we don't actually intruduce multi-packet buffers in this commit, -only some of the necessary logics to handle such packets. Signed-off-by: Jon Maloy <jon.ma...@ericsson.com> --- net/tipc/link.c | 72 +++++++++++++++++++++++++++++++-------------------------- net/tipc/msg.c | 25 ++++++++++---------- net/tipc/msg.h | 18 +++++++++++++++ 3 files changed, 70 insertions(+), 45 deletions(-) diff --git a/net/tipc/link.c b/net/tipc/link.c index f16219c..e205347 100644 --- a/net/tipc/link.c +++ b/net/tipc/link.c @@ -158,6 +158,7 @@ struct tipc_link { /* Sending */ struct sk_buff_head transmq; struct sk_buff_head backlogq; + u16 transmq_len; struct { u16 len; u16 limit; @@ -942,6 +943,7 @@ void tipc_link_reset(struct tipc_link *l) l->reasm_buf = NULL; l->reasm_tnlmsg = NULL; l->failover_reasm_skb = NULL; + l->transmq_len = 0; l->rcv_unacked = 0; l->snd_nxt = 1; l->rcv_nxt = 1; @@ -975,11 +977,11 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; u16 ack = l->rcv_nxt - 1; u16 seqno = l->snd_nxt; - int pkt_cnt = skb_queue_len(list); int imp = msg_importance(hdr); unsigned int mss = tipc_link_mss(l); unsigned int cwin = l->window; unsigned int mtu = l->mtu; + unsigned int pktcnt; bool new_bundle; int rc = 0; @@ -990,7 +992,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, __skb_queue_purge(list); return -EMSGSIZE; } - /* Allow oversubscription of one data msg per source at congestion */ if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) { if (imp == TIPC_SYSTEM_IMPORTANCE) { @@ -1000,15 +1001,15 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, rc = link_schedule_user(l, hdr); } - if (pkt_cnt > 1) { - l->stats.sent_fragmented++; - l->stats.sent_fragments += pkt_cnt; - } - /* Prepare each packet for sending, and add to relevant queue: */ while ((skb = __skb_dequeue(list))) { - if (likely(skb_queue_len(transmq) < cwin)) { - hdr = buf_msg(skb); + hdr = buf_msg(skb); + pktcnt = msg_pktcnt(hdr); + if (msg_user(hdr) == MSG_FRAGMENTER) { + l->stats.sent_fragmented++; + l->stats.sent_fragments += skb_shinfo(skb)->gso_segs; + } + if (likely(l->transmq_len < cwin)) { msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); msg_set_bcast_ack(hdr, bc_ack); @@ -1019,14 +1020,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, return -ENOBUFS; } __skb_queue_tail(transmq, skb); - /* next retransmit attempt */ - if (link_is_bc_sndlink(l)) - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; + l->transmq_len += pktcnt; __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; + if (link_is_bc_sndlink(l)) + TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; l->rcv_unacked = 0; - l->stats.sent_pkts++; - seqno++; + seqno += pktcnt; continue; } if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, &skb, @@ -1046,10 +1046,10 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list, continue; } l->backlog[imp].target_bskb = NULL; - l->backlog[imp].len += (1 + skb_queue_len(list)); + l->backlog[imp].len += pktcnt; __skb_queue_tail(backlogq, skb); - skb_queue_splice_tail_init(list, backlogq); } + l->stats.sent_pkts += mod(seqno - l->snd_nxt); l->snd_nxt = seqno; return rc; } @@ -1118,23 +1118,22 @@ static void tipc_link_advance_backlog(struct tipc_link *l, __skb_dequeue(&l->backlogq); hdr = buf_msg(skb); imp = msg_importance(hdr); - l->backlog[imp].len--; + l->backlog[imp].len -= msg_pktcnt(hdr); if (unlikely(skb == l->backlog[imp].target_bskb)) l->backlog[imp].target_bskb = NULL; __skb_queue_tail(&l->transmq, skb); - /* next retransmit attempt */ + l->transmq_len += msg_pktcnt(hdr); if (link_is_bc_sndlink(l)) TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM; - __skb_queue_tail(xmitq, _skb); TIPC_SKB_CB(skb)->ackers = l->ackers; msg_set_seqno(hdr, seqno); msg_set_ack(hdr, ack); msg_set_bcast_ack(hdr, bc_ack); l->rcv_unacked = 0; - l->stats.sent_pkts++; - seqno++; + seqno += msg_pktcnt(hdr); } + l->stats.sent_pkts += mod(seqno - l->snd_nxt); l->snd_nxt = seqno; } @@ -1202,8 +1201,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, u16 from, u16 to, struct sk_buff_head *xmitq) { struct sk_buff *_skb, *skb = skb_peek(&l->transmq); - u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1; - u16 ack = l->rcv_nxt - 1; int retransmitted = 0; struct tipc_msg *hdr; int rc = 0; @@ -1230,9 +1227,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, struct tipc_link *r, _skb = pskb_copy(skb, GFP_ATOMIC); if (!_skb) return 0; - hdr = buf_msg(_skb); - msg_set_ack(hdr, ack); - msg_set_bcast_ack(hdr, bc_ack); _skb->priority = TC_PRIO_CONTROL; __skb_queue_tail(xmitq, _skb); l->stats.retransmitted++; @@ -1406,13 +1400,18 @@ static int tipc_link_release_pkts(struct tipc_link *l, u16 acked) { int released = 0; struct sk_buff *skb, *tmp; + struct tipc_msg *hdr; + int pktcnt; skb_queue_walk_safe(&l->transmq, skb, tmp) { - if (more(buf_seqno(skb), acked)) + hdr = buf_msg(skb); + pktcnt = msg_pktcnt(hdr); + if (more(msg_last_seqno(hdr), acked)) break; __skb_unlink(skb, &l->transmq); kfree_skb(skb); - released++; + l->transmq_len -= pktcnt; + released += pktcnt; } return released; } @@ -1486,17 +1485,20 @@ static int tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap, bool passed = false; u16 released = 0; u16 seqno, n = 0; + int pktcnt; int rc = 0; skb_queue_walk_safe(&l->transmq, skb, tmp) { - seqno = buf_seqno(skb); + seqno = msg_last_seqno(buf_msg(skb)); next_gap_ack: if (less_eq(seqno, acked)) { /* release skb */ + pktcnt = msg_pktcnt(buf_msg(skb)); __skb_unlink(skb, &l->transmq); kfree_skb(skb); - released++; + l->transmq_len -= pktcnt; + released += pktcnt; } else if (less_eq(seqno, acked + gap)) { /* First, check if repeated retrans failures occurs? */ if (!passed && link_retransmit_failure(l, l, &rc)) @@ -1622,7 +1624,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, struct sk_buff_head *defq = &l->deferdq; struct tipc_msg *hdr = buf_msg(skb); u16 seqno, rcv_nxt, win_lim; - int released = 0; + int pktcnt, released = 0; int rc = 0; /* Verify and update link state */ @@ -1635,6 +1637,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, do { hdr = buf_msg(skb); seqno = msg_seqno(hdr); + pktcnt = msg_pktcnt(hdr); rcv_nxt = l->rcv_nxt; win_lim = rcv_nxt + TIPC_MAX_LINK_WIN; @@ -1661,14 +1664,15 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, } /* Deliver packet */ - l->rcv_nxt++; + l->rcv_nxt += pktcnt; l->stats.recv_pkts++; + l->rcv_unacked += pktcnt; if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL)) rc |= tipc_link_tnl_rcv(l, skb, l->inputq); else if (!tipc_data_input(l, skb, l->inputq)) rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf); - if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) + if (unlikely(l->rcv_unacked >= TIPC_MIN_LINK_WIN)) rc |= tipc_link_build_state_msg(l, xmitq); if (unlikely(rc & ~TIPC_LINK_SND_STATE)) break; @@ -1813,6 +1817,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, if (!tnl) return; + skb_queue_head_init(&tnlq); skb_queue_head_init(&tmpxq); @@ -2286,6 +2291,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, break; if (!--TIPC_SKB_CB(skb)->ackers) { __skb_unlink(skb, &snd_l->transmq); + snd_l->transmq_len -= msg_pktcnt(buf_msg(skb)); kfree_skb(skb); } } diff --git a/net/tipc/msg.c b/net/tipc/msg.c index 812334d..a70d8a9 100644 --- a/net/tipc/msg.c +++ b/net/tipc/msg.c @@ -425,6 +425,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset, FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr)); msg_set_size(&pkthdr, pktmax); msg_set_fragm_no(&pkthdr, pktno); + msg_set_pktcnt(&pkthdr, 1); msg_set_importance(&pkthdr, msg_importance(mhdr)); /* Prepare first fragment */ @@ -828,27 +829,27 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, * @seqno: sequence number of buffer to add * @skb: buffer to add */ -void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, +void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 first, struct sk_buff *skb) { + u16 last = msg_last_seqno(buf_msg(skb)); struct sk_buff *_skb, *tmp; + struct tipc_msg *_hdr; + u16 _first, _last; - if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) { + if (skb_queue_empty(list) || less(last, buf_seqno(skb_peek(list)))) { __skb_queue_head(list, skb); return; } - - if (more(seqno, buf_seqno(skb_peek_tail(list)))) { - __skb_queue_tail(list, skb); - return; - } - - skb_queue_walk_safe(list, _skb, tmp) { - if (more(seqno, buf_seqno(_skb))) + skb_queue_reverse_walk_safe(list, _skb, tmp) { + _hdr = buf_msg(_skb); + _first = msg_seqno(_hdr); + _last = msg_last_seqno(_hdr); + if (less(last, _first)) continue; - if (seqno == buf_seqno(_skb)) + if (!less(first, _first) && !more(last, _last)) break; - __skb_queue_before(list, _skb, skb); + __skb_queue_after(list, _skb, skb); return; } kfree_skb(skb); diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 2197f64..1b5c8c8 100644 --- a/net/tipc/msg.h +++ b/net/tipc/msg.h @@ -709,6 +709,24 @@ static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n) msg_set_bits(m, 1, 15, 0x1fff, n); } +static inline u16 msg_pktcnt(struct tipc_msg *m) +{ + if (likely(msg_user(m) != MSG_FRAGMENTER || + msg_type(m) != FIRST_FRAGMENT)) + return 1; + return msg_bits(m, 1, 23, 0x3f); +} + +static inline void msg_set_pktcnt(struct tipc_msg *m, u32 n) +{ + msg_set_bits(m, 1, 23, 0x3f, n); +} + +static inline u16 msg_last_seqno(struct tipc_msg *m) +{ + return msg_seqno(m) + msg_pktcnt(m) - 1; +} + /* * Word 2 */ -- 2.1.4 _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion