> -----Original Message-----
> From: Jon Maloy [mailto:[email protected]]
> Sent: 2016年4月21日 23:38
> To: [email protected];
> [email protected]; Xue, Ying;
> [email protected]; [email protected]
> Cc: [email protected]
> Subject: [PATCH net-next 3/3] tipc: redesign connecton-level flow control

s/conneton/connection

> 
> There are two flow control mechanisms in TIPC; one at link level that handles
> network congestion, burst control, and retransmission, and one at
> connection level which' only remaining task is to prevent overflow in the
> receiving socket buffer. In TIPC, the latter task has to be solved end-to-end
> because messages can not be thown away once they have been accepted

s/thown/thrown

> and delivered upwards from the link layer, i.e, we can never permit the
> receive buffer to overflow.
> 
> Currently, this algorithm is message based. A counter in the receiving socket
> keeps track of number of consumed messages, and sends a dedicated
> acknowledge message back to the sender for each 256 consumed message.
> A counter at the sending end keeps track of the sent, not yet acknowledged
> messages, and blocks the sender if this number ever reaches
> 512 unacknowledged messages. When the missing acknowledge arrives, the
> socket is then woken up for renewed transmission. This works well for
> keeping the message flow running, as it almost never happens that a sender
> socket is blocked this way.
> 
> A problem with the current mechanism is that it is potentially very memory
> consuming. Since we don't distinguish beween very small and very large

s/beween/between

> messages, we have to dimension the socket receive buffer according to a
> worst-case of both. I.e., the window size must be chosen large enough to
> sustain a reasonable throughput even for the smallest messages, while we
> must still consider a scenario where all messages are of maximum size. Hence,
> the current fix window size of 512 messages and a maximum message size of
> 66k results in a receive buffer of 66 MB when truesize(66k) = 131k is taken
> into account. It is possible to do much better.
> 
> This commit introduces an algorithm where we instead use 1024-byte blocks
> as base unit. This unit, always rounded upwards from the actual message size,
> is used when we advertise windows as well as when we count and
> acknowledge transmitted data. The advertised window is is based on the

Please remove one redundant "is"

> configured receive buffer size in such a way that even the worst-case
> truesize/msgsize ratio always is covered. Since the smallest possible message
> size (from a flow control viewpoint) now is
> 1024 bytes, we can safely assume this ratio to be less than four, which is the
> value we are now using.
> 
> This way, we have been able to reduce the default receive buffer size from
> 66 MB to 2 MB with maintained performance.
> 
> In order to keep this solution backwards compatible, we introduce a new
> capabability bit in the discovery protocol, and use this throughout the

s/capabability/capability

> message sending/reception path to always select the right base unit.
> 
> Signed-off-by: Jon Maloy <[email protected]>
> ---
>  net/tipc/core.c   |   8 ++--
>  net/tipc/msg.h    |  14 +++++-
>  net/tipc/node.h   |   5 +-
>  net/tipc/socket.c | 140 +++++++++++++++++++++++++++++++++++---------
> ----------
>  net/tipc/socket.h |  17 +++++--
>  5 files changed, 122 insertions(+), 62 deletions(-)
> 
> diff --git a/net/tipc/core.c b/net/tipc/core.c index 03a8428..3c94d76 100644
> --- a/net/tipc/core.c
> +++ b/net/tipc/core.c
> @@ -111,11 +111,9 @@ static int __init tipc_init(void)
> 
>       pr_info("Activated (version " TIPC_MOD_VER ")\n");
> 
> -     sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
> -                           TIPC_LOW_IMPORTANCE;
> -     sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 <<
> -                           TIPC_CRITICAL_IMPORTANCE;
> -     sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
> +     sysctl_tipc_rmem[0] = RCVBUF_MIN;
> +     sysctl_tipc_rmem[1] = RCVBUF_DEF;
> +     sysctl_tipc_rmem[2] = RCVBUF_MAX;
> 
>       err = tipc_netlink_start();
>       if (err)
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h index 58bf515..024da8a 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg
> *m, u16 n)
>       msg_set_bits(m, 9, 16, 0xffff, n);
>  }
> 
> -static inline u32 msg_bcast_tag(struct tipc_msg *m)
> +static inline u32 msg_conn_ack(struct tipc_msg *m)
>  {
>       return msg_bits(m, 9, 16, 0xffff);
>  }
> 
> -static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n)
> +static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
>  {
>       msg_set_bits(m, 9, 16, 0xffff, n);
>  }
> 
> +static inline u32 msg_adv_win(struct tipc_msg *m) {
> +     return msg_bits(m, 9, 0, 0xffff);
> +}
> +
> +static inline void msg_set_adv_win(struct tipc_msg *m, u32 n) {
> +     msg_set_bits(m, 9, 0, 0xffff, n);
> +}
> +
>  static inline u32 msg_max_pkt(struct tipc_msg *m)  {
>       return msg_bits(m, 9, 16, 0xffff) * 4; diff --git a/net/tipc/node.h
> b/net/tipc/node.h index 1823768..8264b3d 100644
> --- a/net/tipc/node.h
> +++ b/net/tipc/node.h
> @@ -45,10 +45,11 @@
>  /* Optional capabilities supported by this code version
>   */
>  enum {
> -     TIPC_BCAST_SYNCH = (1 << 1)
> +     TIPC_BCAST_SYNCH   = (1 << 1),
> +     TIPC_BLOCK_FLOWCTL = (2 << 1)
>  };
> 
> -#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
> +#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH |
> TIPC_BLOCK_FLOWCTL)
>  #define INVALID_BEARER_ID -1
> 
>  void tipc_node_stop(struct net *net);
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c index 94bd286..4f4048e
> 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -96,9 +96,11 @@ struct tipc_sock {
>       uint conn_timeout;
>       atomic_t dupl_rcvcnt;
>       bool link_cong;
> -     uint sent_unacked;
> -     uint rcv_unacked;
> +     u16 snt_unacked;
> +     u16 snd_win;
>       u16 peer_caps;
> +     u16 rcv_unacked;
> +     u16 rcv_win;
>       struct sockaddr_tipc remote;
>       struct rhash_head node;
>       struct rcu_head rcu;
> @@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
>       return container_of(sk, struct tipc_sock, sk);  }
> 
> -static int tsk_conn_cong(struct tipc_sock *tsk)
> +static bool tsk_conn_cong(struct tipc_sock *tsk)
>  {
> -     return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;
> +     return tsk->snt_unacked >= tsk->snd_win; }
> +
> +/* tsk_blocks(): translate a buffer size in bytes to number of
> + * advertisable blocks, taking into account the ratio truesize(len)/len

s/advertisable/advisable

> + * We can trust that this ratio is always < 4 for len >= FLOWCTL_BKL_SZ
> +*/ static u16 tsk_adv_blocks(int len) {
> +     return len / FLOWCTL_BLK_SZ 
/ 4;
> +}
> +
> +/* tsk_inc(): increment counter for sent or received data
> + * - If block based flow control is not spported by peer we

s/spported/supported

> + *   fall back to message based ditto, incrementing the counter
> + */
> +static u16 tsk_inc(struct tipc_sock *tsk, int msglen) {
> +     if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
> +             return ((msglen / FLOWCTL_BLK_SZ) + 1);
> +     return 1;
>  }
> 
>  /**
> @@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct
> socket *sock,
>       sk->sk_write_space = tipc_write_space;
>       sk->sk_destruct = tipc_sock_destruct;
>       tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
> -     tsk->sent_unacked = 0;
>       atomic_set(&tsk->dupl_rcvcnt, 0);
> 
> +     /* Start out with safe limits until we receive an advertised window */
> +     tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
> +     tsk->rcv_win = tsk->snd_win;
> +
>       if (sock->state == SS_READY) {
>               tsk_set_unreturnable(tsk, true);
>               if (sock->type == SOCK_DGRAM)
> @@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk,
> struct sk_buff *skb)
>       struct sock *sk = &tsk->sk;
>       struct tipc_msg *hdr = buf_msg(skb);
>       int mtyp = msg_type(hdr);
> -     int conn_cong;
> +     bool conn_cong;
> 
>       /* Ignore if connection cannot be validated: */
>       if (!tsk_peer_msg(tsk, hdr))
> @@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk,
> struct sk_buff *skb)
>               return;
>       } else if (mtyp == CONN_ACK) {
>               conn_cong = tsk_conn_cong(tsk);
> -             tsk->sent_unacked -= msg_msgcnt(hdr);
> +             tsk->snt_unacked -= msg_conn_ack(hdr);
> +             if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
> +                     tsk->snd_win = msg_adv_win(hdr);
>               if (conn_cong)
>                       sk->sk_write_space(sk);
>       } else if (mtyp != CONN_PROBE_REPLY) { @@ -1021,12 +1048,14 @@
> static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t
> dsz)
>       u32 dnode;
>       uint mtu, send, sent = 0;
>       struct iov_iter save;
> +     int hlen = MIN_H_SIZE;
> 
>       /* Handle implied connection establishment */
>       if (unlikely(dest)) {
>               rc = __tipc_sendmsg(sock, m, dsz);
> +             hlen = msg_hdr_sz(mhdr);
>               if (dsz && (dsz == rc))
> -                     tsk->sent_unacked = 1;
> +                     tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
>               return rc;
>       }
>       if (dsz > (uint)INT_MAX)
> @@ -1055,7 +1084,7 @@ next:
>               if (likely(!tsk_conn_cong(tsk))) {
>                       rc = tipc_node_xmit(net, &pktchain, dnode, portid);
>                       if (likely(!rc)) {
> -                             tsk->sent_unacked++;
> +                             tsk->snt_unacked += tsk_inc(tsk, send +
> hlen);
>                               sent += send;
>                               if (sent == dsz)
>                                       return dsz;
> @@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock
> *tsk, u32 peer_port,
>       tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
>       tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
>       tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
> +     if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
> +             return;
> +
> +     /* Fall back to message based flow control */
> +     tsk->rcv_win = FLOWCTL_MSG_WIN;
> +     tsk->snd_win = FLOWCTL_MSG_WIN;

The setting above seems redundant. Even if the peer socket doesn't support the 
new flow control algorithm, its default window size is 512, which is same as 
the setting configured in tipc_sk_create() like:
> +     /* Start out with safe limits until we receive an advertised window */
> +     tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
> +     tsk->rcv_win = tsk->snd_win;

Except for the several typos above, the new algorithm is very good. 

Regards,
Ying

>  }
> 
>  /**
> @@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m,
> struct tipc_msg *msg,
>       return 0;
>  }
> 
> -static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
> +static void tipc_sk_send_ack(struct tipc_sock *tsk)
>  {
>       struct net *net = sock_net(&tsk->sk);
>       struct sk_buff *skb = NULL;
> @@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk,
> uint ack)
>       if (!skb)
>               return;
>       msg = buf_msg(skb);
> -     msg_set_msgcnt(msg, ack);
> +     msg_set_conn_ack(msg, tsk->rcv_unacked);
> +     tsk->rcv_unacked = 0;
> +
> +     /* Adjust to and advertize the correct window limit */
> +     if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
> +             tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
> +             msg_set_adv_win(msg, tsk->rcv_win);
> +     }
>       tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));  }
> 
> @@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct
> msghdr *m, size_t buf_len,
>       long timeo;
>       unsigned int sz;
>       u32 err;
> -     int res;
> +     int res, hlen;
> 
>       /* Catch invalid receive requests */
>       if (unlikely(!buf_len))
> @@ -1315,6 +1357,7 @@ restart:
>       buf = skb_peek(&sk->sk_receive_queue);
>       msg = buf_msg(buf);
>       sz = msg_data_sz(msg);
> +     hlen = msg_hdr_sz(msg);
>       err = msg_errcode(msg);
> 
>       /* Discard an empty non-errored message & try again */ @@ -1337,7
> +1380,7 @@ restart:
>                       sz = buf_len;
>                       m->msg_flags |= MSG_TRUNC;
>               }
> -             res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m,
> sz);
> +             res = skb_copy_datagram_msg(buf, hlen, m, sz);
>               if (res)
>                       goto exit;
>               res = sz;
> @@ -1349,15 +1392,15 @@ restart:
>                       res = -ECONNRESET;
>       }
> 
> -     /* Consume received message (optional) */
> -     if (likely(!(flags & MSG_PEEK))) {
> -             if ((sock->state != SS_READY) &&
> -                 (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
> -                     tipc_sk_send_ack(tsk, tsk->rcv_unacked);
> -                     tsk->rcv_unacked = 0;
> -             }
> -             tsk_advance_rx_queue(sk);
> +     if (unlikely(flags & MSG_PEEK))
> +             goto exit;
> +
> +     if (likely(sock->state != SS_READY)) {
> +             tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
> +             if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
> +                     tipc_sk_send_ack(tsk);
>       }
> +     tsk_advance_rx_queue(sk);
>  exit:
>       release_sock(sk);
>       return res;
> @@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock,
> struct msghdr *m,
>       int sz_to_copy, target, needed;
>       int sz_copied = 0;
>       u32 err;
> -     int res = 0;
> +     int res = 0, hlen;
> 
>       /* Catch invalid receive attempts */
>       if (unlikely(!buf_len))
> @@ -1412,6 +1455,7 @@ restart:
>       buf = skb_peek(&sk->sk_receive_queue);
>       msg = buf_msg(buf);
>       sz = msg_data_sz(msg);
> +     hlen = msg_hdr_sz(msg);
>       err = msg_errcode(msg);
> 
>       /* Discard an empty non-errored message & try again */ @@ -1436,8
> +1480,7 @@ restart:
>               needed = (buf_len - sz_copied);
>               sz_to_copy = (sz <= needed) ? sz : needed;
> 
> -             res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) +
> offset,
> -                                         m, sz_to_copy);
> +             res = skb_copy_datagram_msg(buf, hlen + offset, m,
> sz_to_copy);
>               if (res)
>                       goto exit;
> 
> @@ -1459,20 +1502,18 @@ restart:
>                       res = -ECONNRESET;
>       }
> 
> -     /* Consume received message (optional) */
> -     if (likely(!(flags & MSG_PEEK))) {
> -             if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) {
> -                     tipc_sk_send_ack(tsk, tsk->rcv_unacked);
> -                     tsk->rcv_unacked = 0;
> -             }
> -             tsk_advance_rx_queue(sk);
> -     }
> +     if (unlikely(flags & MSG_PEEK))
> +             goto exit;
> +
> +     tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
> +     if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
> +             tipc_sk_send_ack(tsk);
> +     tsk_advance_rx_queue(sk);
> 
>       /* Loop around if more data is required */
>       if ((sz_copied < buf_len) &&    /* didn't get all requested data */
>           (!skb_queue_empty(&sk->sk_receive_queue) ||
>           (sz_copied < target)) &&    /* and more is ready or required */
> -         (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
>           (!err))                     /* and haven't reached a FIN */
>               goto restart;
> 
> @@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk,
> struct sk_buff *skb)
>  /**
>   * rcvbuf_limit - get proper overload limit of socket receive queue
>   * @sk: socket
> - * @buf: message
> + * @skb: message
>   *
> - * For all connection oriented messages, irrespective of importance,
> - * the default overload value (i.e. 67MB) is set as limit.
> + * For connection oriented messages, irrespective of importance,
> + * default queue limit is 2 MB.
>   *
> - * For all connectionless messages, by default new queue limits are
> - * as belows:
> + * For connectionless messages, queue limits are based on message
> + * importance as follows:
>   *
> - * TIPC_LOW_IMPORTANCE       (4 MB)
> - * TIPC_MEDIUM_IMPORTANCE    (8 MB)
> - * TIPC_HIGH_IMPORTANCE      (16 MB)
> - * TIPC_CRITICAL_IMPORTANCE  (32 MB)
> + * TIPC_LOW_IMPORTANCE       (2 MB)
> + * TIPC_MEDIUM_IMPORTANCE    (4 MB)
> + * TIPC_HIGH_IMPORTANCE      (8 MB)
> + * TIPC_CRITICAL_IMPORTANCE  (16 MB)
>   *
>   * Returns overload limit according to corresponding message importance
>   */
> -static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
> +static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
>  {
> -     struct tipc_msg *msg = buf_msg(buf);
> +     struct tipc_sock *tsk = tipc_sk(sk);
> +     struct tipc_msg *hdr = buf_msg(skb);
> +
> +     if (unlikely(!msg_connected(hdr)))
> +             return sk->sk_rcvbuf << msg_importance(hdr);
> 
> -     if (msg_connected(msg))
> -             return sysctl_tipc_rmem[2];
> +     if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
> +             return sk->sk_rcvbuf;
> 
> -     return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
> -             msg_importance(msg);
> +     return FLOWCTL_MSG_LIM;
>  }
> 
>  /**
> diff --git a/net/tipc/socket.h b/net/tipc/socket.h index 4241f22..06fb594
> 100644
> --- a/net/tipc/socket.h
> +++ b/net/tipc/socket.h
> @@ -1,6 +1,6 @@
>  /* net/tipc/socket.h: Include file for TIPC socket code
>   *
> - * Copyright (c) 2014-2015, Ericsson AB
> + * Copyright (c) 2014-2016, Ericsson AB
>   * All rights reserved.
>   *
>   * Redistribution and use in source and binary forms, with or without @@ -
> 38,10 +38,17 @@  #include <net/sock.h>  #include <net/genetlink.h>
> 
> -#define TIPC_CONNACK_INTV         256
> -#define TIPC_FLOWCTRL_WIN        (TIPC_CONNACK_INTV * 2)
> -#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
> -
> SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
> +/* Compatibility values for deprecated message based flow control */
> +#define FLOWCTL_MSG_WIN 512 #define FLOWCTL_MSG_LIM
> ((FLOWCTL_MSG_WIN *
> +2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
> +
> +#define FLOWCTL_BLK_SZ 1024
> +
> +/* Socket receive buffer sizes */
> +#define RCVBUF_MIN  (FLOWCTL_BLK_SZ * 512) #define RCVBUF_DEF
> +(FLOWCTL_BLK_SZ * 1024 * 2) #define RCVBUF_MAX  (FLOWCTL_BLK_SZ *
> 1024
> +* 16)
> +
>  int tipc_socket_init(void);
>  void tipc_socket_stop(void);
>  void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
> --
> 1.9.1

------------------------------------------------------------------------------
Find and fix application performance issues faster with Applications Manager
Applications Manager provides deep performance insights into multiple tiers of
your business applications. It resolves application problems quickly and
reduces your MTTR. Get your free trial!
https://ad.doubleclick.net/ddm/clk/302982198;130105516;z
_______________________________________________
tipc-discussion mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to