As a preparation for introduction of GSO we we need to eliminate the
assumption that there is a one-to-one relation between queued/sent
sk_buffs and ditto sequence numbered packets.

A GSO prepared buffer may in the future represent many message
fragments, each of which having their own packet sequence number.
We therefore prepare message buffers of type FIRST_FRAGMENT so that
they may contain a complete message, comprising all its potential
fragments, -not only the first one. We do this by adding a "packet
count" field and a corresponding "last_seqno()" function to the header
of such messages, and adapt the link transmission and reception code
to handle them correctly.

Note that we don't actually intruduce multi-packet buffers in this
commit, -only some of the necessary logics to handle such packets.

Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
---
 net/tipc/link.c | 72 +++++++++++++++++++++++++++++++--------------------------
 net/tipc/msg.c  | 25 ++++++++++----------
 net/tipc/msg.h  | 18 +++++++++++++++
 3 files changed, 70 insertions(+), 45 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index f16219c..e205347 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -158,6 +158,7 @@ struct tipc_link {
        /* Sending */
        struct sk_buff_head transmq;
        struct sk_buff_head backlogq;
+       u16 transmq_len;
        struct {
                u16 len;
                u16 limit;
@@ -942,6 +943,7 @@ void tipc_link_reset(struct tipc_link *l)
        l->reasm_buf = NULL;
        l->reasm_tnlmsg = NULL;
        l->failover_reasm_skb = NULL;
+       l->transmq_len = 0;
        l->rcv_unacked = 0;
        l->snd_nxt = 1;
        l->rcv_nxt = 1;
@@ -975,11 +977,11 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
        u16 ack = l->rcv_nxt - 1;
        u16 seqno = l->snd_nxt;
-       int pkt_cnt = skb_queue_len(list);
        int imp = msg_importance(hdr);
        unsigned int mss = tipc_link_mss(l);
        unsigned int cwin = l->window;
        unsigned int mtu = l->mtu;
+       unsigned int pktcnt;
        bool new_bundle;
        int rc = 0;
 
@@ -990,7 +992,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head 
*list,
                __skb_queue_purge(list);
                return -EMSGSIZE;
        }
-
        /* Allow oversubscription of one data msg per source at congestion */
        if (unlikely(l->backlog[imp].len >= l->backlog[imp].limit)) {
                if (imp == TIPC_SYSTEM_IMPORTANCE) {
@@ -1000,15 +1001,15 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
                rc = link_schedule_user(l, hdr);
        }
 
-       if (pkt_cnt > 1) {
-               l->stats.sent_fragmented++;
-               l->stats.sent_fragments += pkt_cnt;
-       }
-
        /* Prepare each packet for sending, and add to relevant queue: */
        while ((skb = __skb_dequeue(list))) {
-               if (likely(skb_queue_len(transmq) < cwin)) {
-                       hdr = buf_msg(skb);
+               hdr = buf_msg(skb);
+               pktcnt = msg_pktcnt(hdr);
+               if (msg_user(hdr) == MSG_FRAGMENTER) {
+                       l->stats.sent_fragmented++;
+                       l->stats.sent_fragments += skb_shinfo(skb)->gso_segs;
+               }
+               if (likely(l->transmq_len < cwin)) {
                        msg_set_seqno(hdr, seqno);
                        msg_set_ack(hdr, ack);
                        msg_set_bcast_ack(hdr, bc_ack);
@@ -1019,14 +1020,13 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
                                return -ENOBUFS;
                        }
                        __skb_queue_tail(transmq, skb);
-                       /* next retransmit attempt */
-                       if (link_is_bc_sndlink(l))
-                               TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
+                       l->transmq_len += pktcnt;
                        __skb_queue_tail(xmitq, _skb);
                        TIPC_SKB_CB(skb)->ackers = l->ackers;
+                       if (link_is_bc_sndlink(l))
+                               TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
                        l->rcv_unacked = 0;
-                       l->stats.sent_pkts++;
-                       seqno++;
+                       seqno += pktcnt;
                        continue;
                }
                if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, &skb,
@@ -1046,10 +1046,10 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
                        continue;
                }
                l->backlog[imp].target_bskb = NULL;
-               l->backlog[imp].len += (1 + skb_queue_len(list));
+               l->backlog[imp].len += pktcnt;
                __skb_queue_tail(backlogq, skb);
-               skb_queue_splice_tail_init(list, backlogq);
        }
+       l->stats.sent_pkts += mod(seqno - l->snd_nxt);
        l->snd_nxt = seqno;
        return rc;
 }
@@ -1118,23 +1118,22 @@ static void tipc_link_advance_backlog(struct tipc_link 
*l,
                __skb_dequeue(&l->backlogq);
                hdr = buf_msg(skb);
                imp = msg_importance(hdr);
-               l->backlog[imp].len--;
+               l->backlog[imp].len -= msg_pktcnt(hdr);
                if (unlikely(skb == l->backlog[imp].target_bskb))
                        l->backlog[imp].target_bskb = NULL;
                __skb_queue_tail(&l->transmq, skb);
-               /* next retransmit attempt */
+               l->transmq_len += msg_pktcnt(hdr);
                if (link_is_bc_sndlink(l))
                        TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
-
                __skb_queue_tail(xmitq, _skb);
                TIPC_SKB_CB(skb)->ackers = l->ackers;
                msg_set_seqno(hdr, seqno);
                msg_set_ack(hdr, ack);
                msg_set_bcast_ack(hdr, bc_ack);
                l->rcv_unacked = 0;
-               l->stats.sent_pkts++;
-               seqno++;
+               seqno += msg_pktcnt(hdr);
        }
+       l->stats.sent_pkts += mod(seqno - l->snd_nxt);
        l->snd_nxt = seqno;
 }
 
@@ -1202,8 +1201,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, 
struct tipc_link *r,
                                u16 from, u16 to, struct sk_buff_head *xmitq)
 {
        struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
-       u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-       u16 ack = l->rcv_nxt - 1;
        int retransmitted = 0;
        struct tipc_msg *hdr;
        int rc = 0;
@@ -1230,9 +1227,6 @@ static int tipc_link_bc_retrans(struct tipc_link *l, 
struct tipc_link *r,
                _skb = pskb_copy(skb, GFP_ATOMIC);
                if (!_skb)
                        return 0;
-               hdr = buf_msg(_skb);
-               msg_set_ack(hdr, ack);
-               msg_set_bcast_ack(hdr, bc_ack);
                _skb->priority = TC_PRIO_CONTROL;
                __skb_queue_tail(xmitq, _skb);
                l->stats.retransmitted++;
@@ -1406,13 +1400,18 @@ static int tipc_link_release_pkts(struct tipc_link *l, 
u16 acked)
 {
        int released = 0;
        struct sk_buff *skb, *tmp;
+       struct tipc_msg *hdr;
+       int pktcnt;
 
        skb_queue_walk_safe(&l->transmq, skb, tmp) {
-               if (more(buf_seqno(skb), acked))
+               hdr = buf_msg(skb);
+               pktcnt = msg_pktcnt(hdr);
+               if (more(msg_last_seqno(hdr), acked))
                        break;
                __skb_unlink(skb, &l->transmq);
                kfree_skb(skb);
-               released++;
+               l->transmq_len -= pktcnt;
+               released += pktcnt;
        }
        return released;
 }
@@ -1486,17 +1485,20 @@ static int tipc_link_advance_transmq(struct tipc_link 
*l, u16 acked, u16 gap,
        bool passed = false;
        u16 released = 0;
        u16 seqno, n = 0;
+       int pktcnt;
        int rc = 0;
 
        skb_queue_walk_safe(&l->transmq, skb, tmp) {
-               seqno = buf_seqno(skb);
+               seqno = msg_last_seqno(buf_msg(skb));
 
 next_gap_ack:
                if (less_eq(seqno, acked)) {
                        /* release skb */
+                       pktcnt = msg_pktcnt(buf_msg(skb));
                        __skb_unlink(skb, &l->transmq);
                        kfree_skb(skb);
-                       released++;
+                       l->transmq_len -= pktcnt;
+                       released += pktcnt;
                } else if (less_eq(seqno, acked + gap)) {
                        /* First, check if repeated retrans failures occurs? */
                        if (!passed && link_retransmit_failure(l, l, &rc))
@@ -1622,7 +1624,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff 
*skb,
        struct sk_buff_head *defq = &l->deferdq;
        struct tipc_msg *hdr = buf_msg(skb);
        u16 seqno, rcv_nxt, win_lim;
-       int released = 0;
+       int pktcnt, released = 0;
        int rc = 0;
 
        /* Verify and update link state */
@@ -1635,6 +1637,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff 
*skb,
        do {
                hdr = buf_msg(skb);
                seqno = msg_seqno(hdr);
+               pktcnt = msg_pktcnt(hdr);
                rcv_nxt = l->rcv_nxt;
                win_lim = rcv_nxt + TIPC_MAX_LINK_WIN;
 
@@ -1661,14 +1664,15 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff 
*skb,
                }
 
                /* Deliver packet */
-               l->rcv_nxt++;
+               l->rcv_nxt += pktcnt;
                l->stats.recv_pkts++;
+               l->rcv_unacked += pktcnt;
 
                if (unlikely(msg_user(hdr) == TUNNEL_PROTOCOL))
                        rc |= tipc_link_tnl_rcv(l, skb, l->inputq);
                else if (!tipc_data_input(l, skb, l->inputq))
                        rc |= tipc_link_input(l, skb, l->inputq, &l->reasm_buf);
-               if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
+               if (unlikely(l->rcv_unacked >= TIPC_MIN_LINK_WIN))
                        rc |= tipc_link_build_state_msg(l, xmitq);
                if (unlikely(rc & ~TIPC_LINK_SND_STATE))
                        break;
@@ -1813,6 +1817,7 @@ void tipc_link_tnl_prepare(struct tipc_link *l, struct 
tipc_link *tnl,
 
        if (!tnl)
                return;
+
        skb_queue_head_init(&tnlq);
        skb_queue_head_init(&tmpxq);
 
@@ -2286,6 +2291,7 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
                        break;
                if (!--TIPC_SKB_CB(skb)->ackers) {
                        __skb_unlink(skb, &snd_l->transmq);
+                       snd_l->transmq_len -= msg_pktcnt(buf_msg(skb));
                        kfree_skb(skb);
                }
        }
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 812334d..a70d8a9 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -425,6 +425,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 
int offset,
                      FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
        msg_set_size(&pkthdr, pktmax);
        msg_set_fragm_no(&pkthdr, pktno);
+       msg_set_pktcnt(&pkthdr, 1);
        msg_set_importance(&pkthdr, msg_importance(mhdr));
 
        /* Prepare first fragment */
@@ -828,27 +829,27 @@ bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
  * @seqno: sequence number of buffer to add
  * @skb: buffer to add
  */
-void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
+void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 first,
                             struct sk_buff *skb)
 {
+       u16 last = msg_last_seqno(buf_msg(skb));
        struct sk_buff *_skb, *tmp;
+       struct tipc_msg *_hdr;
+       u16 _first, _last;
 
-       if (skb_queue_empty(list) || less(seqno, buf_seqno(skb_peek(list)))) {
+       if (skb_queue_empty(list) || less(last, buf_seqno(skb_peek(list)))) {
                __skb_queue_head(list, skb);
                return;
        }
-
-       if (more(seqno, buf_seqno(skb_peek_tail(list)))) {
-               __skb_queue_tail(list, skb);
-               return;
-       }
-
-       skb_queue_walk_safe(list, _skb, tmp) {
-               if (more(seqno, buf_seqno(_skb)))
+       skb_queue_reverse_walk_safe(list, _skb, tmp) {
+               _hdr = buf_msg(_skb);
+               _first = msg_seqno(_hdr);
+               _last = msg_last_seqno(_hdr);
+               if (less(last, _first))
                        continue;
-               if (seqno == buf_seqno(_skb))
+               if (!less(first, _first) && !more(last, _last))
                        break;
-               __skb_queue_before(list, _skb, skb);
+               __skb_queue_after(list, _skb, skb);
                return;
        }
        kfree_skb(skb);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 2197f64..1b5c8c8 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -709,6 +709,24 @@ static inline void msg_set_node_capabilities(struct 
tipc_msg *m, u32 n)
        msg_set_bits(m, 1, 15, 0x1fff, n);
 }
 
+static inline u16 msg_pktcnt(struct tipc_msg *m)
+{
+       if (likely(msg_user(m) != MSG_FRAGMENTER ||
+                  msg_type(m) != FIRST_FRAGMENT))
+               return 1;
+       return msg_bits(m, 1, 23, 0x3f);
+}
+
+static inline void msg_set_pktcnt(struct tipc_msg *m, u32 n)
+{
+       msg_set_bits(m, 1, 23, 0x3f, n);
+}
+
+static inline u16 msg_last_seqno(struct tipc_msg *m)
+{
+       return msg_seqno(m) + msg_pktcnt(m) - 1;
+}
+
 /*
  * Word 2
  */
-- 
2.1.4



_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to