After experimenting with GSO and GRO we have found that they give very
little performance improvements. However, the previously introduced GSO
framework can be leveraged to significatly improve throughput across
TIPC/UDP/IP anyway.

We have found that we can disable the GSO callback for messages
smaller than 16k, and instead let them be transported as is by the
UDP/IP bearer. By doing this, we obtain a 45% increase of max throughput
for large messages, getting close to the values we had with the earlier
solution with an "emulated mtu". In contrast to that solution, we now
use the real mtu and the real number of outstanding network packets as
base for the link congestion control. Furthermore, if an initial 16k GSO
chunk, corresponding to eleven 1500 byte network packets, is lost, the
algorithm will retransmit the individual network packets, not the whole
message or stream chunk.

This solution seems to be sufficient robust to work well even in lossy
networks, and does not overwhelm and reset the virtio ring buffer as we
have seen in some cases before.

Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
---
 net/tipc/link.c      | 11 +++++++++--
 net/tipc/msg.c       |  9 +++++++--
 net/tipc/msg.h       | 23 +++++++++++++++++++----
 net/tipc/node.h      |  6 ++++--
 net/tipc/udp_media.c | 13 +++++++++++--
 5 files changed, 50 insertions(+), 12 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 8455fd1..7f85165 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1000,6 +1000,8 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
                if (msg_user(hdr) == MSG_FRAGMENTER) {
                        l->stats.sent_fragmented++;
                        l->stats.sent_fragments += skb_shinfo(skb)->gso_segs;
+                       TIPC_SKB_CB(skb)->peer_gso_support = 
+                               !!(l->peer_caps & TIPC_GSO_SUPPORT);
                }
                if (likely(l->transmq_len < cwin)) {
                        msg_set_seqno(hdr, seqno);
@@ -1104,6 +1106,8 @@ static void tipc_link_advance_backlog(struct tipc_link *l,
                skb = skb_peek(&l->backlogq);
                if (!skb)
                        break;
+               TIPC_SKB_CB(skb)->peer_gso_support = 
+                       !!(l->peer_caps & TIPC_GSO_SUPPORT);
                _skb = skb_clone(skb, GFP_ATOMIC);
                if (!_skb)
                        break;
@@ -1193,6 +1197,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, 
struct tipc_link *r,
                                u16 from, u16 to, struct sk_buff_head *xmitq)
 {
        struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
+       int mss = tipc_link_mtu(l) - 2 * INT_H_SIZE;
        int retransmitted = 0;
        struct tipc_msg *hdr;
        int rc = 0;
@@ -1218,7 +1223,7 @@ static int tipc_link_bc_retrans(struct tipc_link *l, 
struct tipc_link *r,
                TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
                if (msg_user(hdr) == MSG_FRAGMENTER) {
                        skb->priority = TC_PRIO_CONTROL;
-                       tipc_skb_segment(skb, 0, from, to, xmitq);
+                       tipc_skb_segment(skb, 0, from, to, mss, xmitq);
                        continue;
                }
                _skb = __pskb_copy(skb, LL_MAX_HEADER + MIN_H_SIZE, GFP_ATOMIC);
@@ -1474,6 +1479,7 @@ static int tipc_link_advance_transmq(struct tipc_link *l, 
u16 acked, u16 gap,
                                     struct tipc_gap_ack_blks *ga,
                                     struct sk_buff_head *xmitq)
 {
+       int mss = tipc_link_mtu(l) - 2 * INT_H_SIZE;
        struct sk_buff *skb, *_skb, *tmp;
        struct tipc_msg *hdr, *_hdr;
        u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
@@ -1509,7 +1515,8 @@ static int tipc_link_advance_transmq(struct tipc_link *l, 
u16 acked, u16 gap,
                        TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
                        if (msg_user(hdr) == MSG_FRAGMENTER) {
                                skb->priority = TC_PRIO_CONTROL;
-                               tipc_skb_segment(skb, 0, acked, acked + gap, 
xmitq);
+                               tipc_skb_segment(skb, 0, acked, acked + gap,
+                                                mss, xmitq);
                                l->stats.retransmitted++;
                                retransmitted = true;
                                continue;
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 3c36ba2..83c2f17 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -152,6 +152,12 @@ int tipc_buf_append(struct sk_buff **headbuf, struct 
sk_buff **buf)
                if (unlikely(skb_unclone(frag, GFP_ATOMIC)))
                        goto err;
                head = *headbuf = frag;
+               if (msg_size(buf_msg(frag)) == msg_data_sz(msg)) {
+                       *buf = head;
+                       TIPC_SKB_CB(head)->tail = NULL;
+                       *headbuf = NULL;
+                       return 1;
+               }
                *buf = NULL;
                TIPC_SKB_CB(head)->tail = NULL;
                if (skb_is_nonlinear(head)) {
@@ -868,7 +874,7 @@ int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int 
pktmax,
 }
 
 int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from,
-                    u16 to, struct sk_buff_head *segs)
+                    u16 to, int mss, struct sk_buff_head *segs)
 {
        struct skb_shared_info *shinfo = skb_shinfo(skb);
        unsigned char *tnl_hdr = skb->data - tnl_hlen;
@@ -877,7 +883,6 @@ int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 
from,
        int frag_pos = frag->bv_offset;
        struct tipc_msg *seg_hdr = NULL;
        skb_frag_t *seg_frag = NULL;
-       int mss = shinfo->gso_size;
        struct sk_buff *seg = NULL;
        unsigned int hlen = msg_hdr_sz(hdr);
        unsigned int left = msg_data_sz(hdr);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 8391581..9fb02b5 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -103,6 +103,7 @@ struct plist;
 #define TIPC_MEDIA_INFO_OFFSET 5
 
 struct tipc_skb_cb {
+//<<<<<<< HEAD
        union {
                struct {
                        struct sk_buff *tail;
@@ -134,12 +135,25 @@ struct tipc_skb_cb {
                };
                u8 flags;
        };
-       u8 reserved;
+       u8 reserved:7;
+       bool peer_gso_support:1;
 #ifdef CONFIG_TIPC_CRYPTO
        void *crypto_ctx;
 #endif
 } __packed;
-
+#if 0
+=======
+       u32 bytes_read;
+       u32 orig_member;
+       struct sk_buff *tail;
+       unsigned long nxt_retr;
+       u16 chain_imp;
+       u16 ackers;
+       u16 peer_caps;
+       u8 validated;
+};
+>>>>>>> tipc: let stream chunks bypass gso over udp
+#endif
 #define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
 
 struct tipc_msg {
@@ -1147,15 +1161,16 @@ void tipc_skb_queue_copy(struct sk_buff_head *from, 
struct sk_buff_head *to);
 int tipc_skb_queue_fragment(struct sk_buff_head *skbq, int pktmax,
                             int *pktcnt, bool frag_supp, int mtyp);
 int tipc_skb_segment(struct sk_buff *skb, int tnl_hlen, u16 from, u16 to,
-                    struct sk_buff_head *segs);
+                    int mss, struct sk_buff_head *segs);
 
 static inline int tipc_skb_segment_all(struct sk_buff *skb, int tnl_hlen,
                                       struct sk_buff_head *segs)
 {
        u16 from = msg_seqno(buf_msg(skb));
        u16 to = from + skb_shinfo(skb)->gso_segs - 1;
+       int mss = skb_shinfo(skb)->gso_size;
 
-       return tipc_skb_segment(skb, tnl_hlen, from, to, segs);
+       return tipc_skb_segment(skb, tnl_hlen, from, to, mss, segs);
 }
 
 static inline u16 buf_seqno(struct sk_buff *skb)
diff --git a/net/tipc/node.h b/net/tipc/node.h
index a6803b4..591165c 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,7 +55,8 @@ enum {
        TIPC_MCAST_RBCTL      = (1 << 7),
        TIPC_GAP_ACK_BLOCK    = (1 << 8),
        TIPC_TUNNEL_ENHANCED  = (1 << 9),
-       TIPC_NAGLE            = (1 << 10)
+       TIPC_NAGLE            = (1 << 10),
+       TIPC_GSO_SUPPORT      = (1 << 11)
 };
 
 #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
@@ -68,7 +69,8 @@ enum {
                                TIPC_MCAST_RBCTL       |   \
                                TIPC_GAP_ACK_BLOCK     |   \
                                TIPC_TUNNEL_ENHANCED   |   \
-                               TIPC_NAGLE)
+                               TIPC_NAGLE             |   \
+                               TIPC_GSO_SUPPORT)
 
 #define INVALID_BEARER_ID -1
 
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index 974d260..210e754 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -52,6 +52,7 @@
 #include "bearer.h"
 #include "netlink.h"
 #include "msg.h"
+#include "node.h"
 
 /* IANA assigned UDP port */
 #define UDP_PORT_DEFAULT       6118
@@ -181,8 +182,16 @@ static int tipc_udp_xmit(struct net *net, struct sk_buff 
*skb,
                        }
                        dst_cache_set_ip4(cache, &rt->dst, fl.saddr);
                }
-               if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER)
-                       skb_shinfo(skb)->gso_type = SKB_GSO_UDP_TUNNEL;
+               /* IP layer does fragm/defrag better than GSO/GRO layer */
+               if (msg_user(buf_msg(skb)) == MSG_FRAGMENTER) {
+                       if (skb->len <= TIPC_MSG_CHUNK_SIZE + INT_H_SIZE * 2 &&
+                           TIPC_SKB_CB(skb)->peer_gso_support) {
+                               skb_shinfo(skb)->gso_size = 0;
+                               skb_shinfo(skb)->gso_segs = 0;
+                       } else {
+                               skb_shinfo(skb)->gso_type = SKB_GSO_UDP_TUNNEL;
+                       }
+               }
                skb->dev = rt->dst.dev;
                ttl = ip4_dst_hoplimit(&rt->dst);
                udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb, src->ipv4.s_addr,
-- 
2.1.4



_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to