On 10/30/19 4:11 AM, Jon Maloy wrote: > We introduce a feature that works like a combination of TCP_NAGLE and > TCP_CORK, but without some of the weaknesses of those. In particular, > we will not observe long delivery delays because of delayed acks, since > the algorithm itself decides if and when acks are to be sent from the > receiving peer. > > - The nagle property as such is determined by manipulating a new > 'maxnagle' field in struct tipc_sock. If certain conditions are met, > 'maxnagle' will define max size of the messages which can be bundled. > If it is set to zero no messages are ever bundled, implying that the > nagle property is disabled. > - A socket with the nagle property enabled enters nagle mode when more > than 4 messages have been sent out without receiving any data message > from the peer. > - A socket leaves nagle mode whenever it receives a data message from > the peer. > > In nagle mode, messages smaller than 'maxnagle' are accumulated in the > socket write queue. The last buffer in the queue is marked with a new > 'ack_required' bit, which forces the receiving peer to send a CONN_ACK > message back to the sender upon reception. > > The accumulated contents of the write queue is transmitted when one of > the following events or conditions occur. > > - A CONN_ACK message is received from the peer. > - A data message is received from the peer. > - A SOCK_WAKEUP pseudo message is received from the link level. > - The write queue contains more than 64 1k blocks of data. > - The connection is being shut down. > - There is no CONN_ACK message to expect. I.e., there is currently > no outstanding message where the 'ack_required' bit was set. As a > consequence, the first message added after we enter nagle mode > is always sent directly with this bit set. > > This new feature gives a 50-100% improvement of throughput for small > (i.e., less than MTU size) messages, while it might add up to one RTT > to latency time when the socket is in nagle mode. > > Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
Acked-by: Ying Xue <ying....@windriver.com> > > v2: Added TIPC_NODELAY socket option > v3: Given TIPC_NODELAY a parameter as suggested by Ying, and simplified > test for when we can bundle messages. > --- > include/uapi/linux/tipc.h | 1 + > net/tipc/msg.c | 53 +++++++++++++++++++++ > net/tipc/msg.h | 12 +++++ > net/tipc/node.h | 7 ++- > net/tipc/socket.c | 114 > +++++++++++++++++++++++++++++++++++++++------- > 5 files changed, 169 insertions(+), 18 deletions(-) > > diff --git a/include/uapi/linux/tipc.h b/include/uapi/linux/tipc.h > index 7df026e..76421b8 100644 > --- a/include/uapi/linux/tipc.h > +++ b/include/uapi/linux/tipc.h > @@ -191,6 +191,7 @@ struct sockaddr_tipc { > #define TIPC_GROUP_JOIN 135 /* Takes struct tipc_group_req* */ > #define TIPC_GROUP_LEAVE 136 /* No argument */ > #define TIPC_SOCK_RECVQ_USED 137 /* Default: none (read only) */ > +#define TIPC_NODELAY 138 /* Default: false */ > > /* > * Flag values > diff --git a/net/tipc/msg.c b/net/tipc/msg.c > index 922d262..973795a 100644 > --- a/net/tipc/msg.c > +++ b/net/tipc/msg.c > @@ -190,6 +190,59 @@ int tipc_buf_append(struct sk_buff **headbuf, struct > sk_buff **buf) > return 0; > } > > +/** > + * tipc_msg_append(): Append data to tail of an existing buffer queue > + * @hdr: header to be used > + * @m: the data to be appended > + * @mss: max allowable size of buffer > + * @dlen: size of data to be appended > + * @txq: queue to appand to > + * Returns the number og 1k blocks appended or errno value > + */ > +int tipc_msg_append(struct tipc_msg *_hdr, struct msghdr *m, int dlen, > + int mss, struct sk_buff_head *txq) > +{ > + struct sk_buff *skb, *prev; > + int accounted, total, curr; > + int mlen, cpy, rem = dlen; > + struct tipc_msg *hdr; > + > + skb = skb_peek_tail(txq); > + accounted = skb ? msg_blocks(buf_msg(skb)) : 0; > + total = accounted; > + > + while (rem) { > + if (!skb || skb->len >= mss) { > + prev = skb; > + skb = tipc_buf_acquire(mss, GFP_KERNEL); > + if (unlikely(!skb)) > + return -ENOMEM; > + skb_orphan(skb); > + skb_trim(skb, MIN_H_SIZE); > + hdr = buf_msg(skb); > + skb_copy_to_linear_data(skb, _hdr, MIN_H_SIZE); > + msg_set_hdr_sz(hdr, MIN_H_SIZE); > + msg_set_size(hdr, MIN_H_SIZE); > + __skb_queue_tail(txq, skb); > + total += 1; > + if (prev) > + msg_set_ack_required(buf_msg(prev), 0); > + msg_set_ack_required(hdr, 1); > + } > + hdr = buf_msg(skb); > + curr = msg_blocks(hdr); > + mlen = msg_size(hdr); > + cpy = min_t(int, rem, mss - mlen); > + if (cpy != copy_from_iter(skb->data + mlen, cpy, &m->msg_iter)) > + return -EFAULT; > + msg_set_size(hdr, mlen + cpy); > + skb_put(skb, cpy); > + rem -= cpy; > + total += msg_blocks(hdr) - curr; > + } > + return total - accounted; > +} > + > /* tipc_msg_validate - validate basic format of received message > * > * This routine ensures a TIPC message has an acceptable header, and at least > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index 0daa6f0..b85b85a 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -290,6 +290,16 @@ static inline void msg_set_src_droppable(struct tipc_msg > *m, u32 d) > msg_set_bits(m, 0, 18, 1, d); > } > > +static inline int msg_ack_required(struct tipc_msg *m) > +{ > + return msg_bits(m, 0, 18, 1); > +} > + > +static inline void msg_set_ack_required(struct tipc_msg *m, u32 d) > +{ > + msg_set_bits(m, 0, 18, 1, d); > +} > + > static inline bool msg_is_rcast(struct tipc_msg *m) > { > return msg_bits(m, 0, 18, 0x1); > @@ -1065,6 +1075,8 @@ int tipc_msg_fragment(struct sk_buff *skb, const struct > tipc_msg *hdr, > int pktmax, struct sk_buff_head *frags); > int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, > int offset, int dsz, int mtu, struct sk_buff_head *list); > +int tipc_msg_append(struct tipc_msg *hdr, struct msghdr *m, int dlen, > + int mss, struct sk_buff_head *txq); > bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); > bool tipc_msg_assemble(struct sk_buff_head *list); > bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head > *rcvq); > diff --git a/net/tipc/node.h b/net/tipc/node.h > index 291d0ec..b9036f28 100644 > --- a/net/tipc/node.h > +++ b/net/tipc/node.h > @@ -54,7 +54,8 @@ enum { > TIPC_LINK_PROTO_SEQNO = (1 << 6), > TIPC_MCAST_RBCTL = (1 << 7), > TIPC_GAP_ACK_BLOCK = (1 << 8), > - TIPC_TUNNEL_ENHANCED = (1 << 9) > + TIPC_TUNNEL_ENHANCED = (1 << 9), > + TIPC_NAGLE = (1 << 10) > }; > > #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ > @@ -66,7 +67,9 @@ enum { > TIPC_LINK_PROTO_SEQNO | \ > TIPC_MCAST_RBCTL | \ > TIPC_GAP_ACK_BLOCK | \ > - TIPC_TUNNEL_ENHANCED) > + TIPC_TUNNEL_ENHANCED | \ > + TIPC_NAGLE) > + > #define INVALID_BEARER_ID -1 > > void tipc_node_stop(struct net *net); > diff --git a/net/tipc/socket.c b/net/tipc/socket.c > index 35e32ff..20bad67 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -75,6 +75,7 @@ struct sockaddr_pair { > * @conn_instance: TIPC instance used when connection was established > * @published: non-zero if port has one or more associated names > * @max_pkt: maximum packet size "hint" used when building messages sent by > port > + * @maxnagle: maximum size of msg which can be subject to nagle > * @portid: unique port identity in TIPC socket hash table > * @phdr: preformatted message header used when sending messages > * #cong_links: list of congested links > @@ -97,6 +98,7 @@ struct tipc_sock { > u32 conn_instance; > int published; > u32 max_pkt; > + u32 maxnagle; > u32 portid; > struct tipc_msg phdr; > struct list_head cong_links; > @@ -116,6 +118,10 @@ struct tipc_sock { > struct tipc_mc_method mc_method; > struct rcu_head rcu; > struct tipc_group *group; > + u32 oneway; > + u16 snd_backlog; > + bool expect_ack; > + bool nodelay; > bool group_is_open; > }; > > @@ -137,6 +143,7 @@ static int tipc_sk_insert(struct tipc_sock *tsk); > static void tipc_sk_remove(struct tipc_sock *tsk); > static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t > dsz); > static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); > +static void tipc_sk_push_backlog(struct tipc_sock *tsk); > > static const struct proto_ops packet_ops; > static const struct proto_ops stream_ops; > @@ -227,6 +234,26 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen) > return 1; > } > > +/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle > + */ > +static void tsk_set_nagle(struct tipc_sock *tsk) > +{ > + struct sock *sk = &tsk->sk; > + > + tsk->maxnagle = 0; > + if (sk->sk_type != SOCK_STREAM) > + return; > + if (tsk->nodelay) > + return; > + if (!(tsk->peer_caps & TIPC_NAGLE)) > + return; > + /* Limit node local buffer size to avoid receive queue overflow */ > + if (tsk->max_pkt == MAX_MSG_SIZE) > + tsk->maxnagle = 1500; > + else > + tsk->maxnagle = tsk->max_pkt; > +} > + > /** > * tsk_advance_rx_queue - discard first buffer in socket receive queue > * > @@ -446,6 +473,7 @@ static int tipc_sk_create(struct net *net, struct socket > *sock, > > tsk = tipc_sk(sk); > tsk->max_pkt = MAX_PKT_DEFAULT; > + tsk->maxnagle = 0; > INIT_LIST_HEAD(&tsk->publications); > INIT_LIST_HEAD(&tsk->cong_links); > msg = &tsk->phdr; > @@ -512,8 +540,12 @@ static void __tipc_shutdown(struct socket *sock, int > error) > tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt && > !tsk_conn_cong(tsk))); > > - /* Remove any pending SYN message */ > - __skb_queue_purge(&sk->sk_write_queue); > + /* Push out unsent messages or remove if pending SYN */ > + skb = skb_peek(&sk->sk_write_queue); > + if (skb && !msg_is_syn(buf_msg(skb))) > + tipc_sk_push_backlog(tsk); > + else > + __skb_queue_purge(&sk->sk_write_queue); > > /* Reject all unreceived messages, except on an active connection > * (which disconnects locally & sends a 'FIN+' to peer). > @@ -1208,6 +1240,27 @@ void tipc_sk_mcast_rcv(struct net *net, struct > sk_buff_head *arrvq, > tipc_sk_rcv(net, inputq); > } > > +/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue > + * when socket is in Nagle mode > + */ > +static void tipc_sk_push_backlog(struct tipc_sock *tsk) > +{ > + struct sk_buff_head *txq = &tsk->sk.sk_write_queue; > + struct net *net = sock_net(&tsk->sk); > + u32 dnode = tsk_peer_node(tsk); > + int rc; > + > + if (skb_queue_empty(txq) || tsk->cong_link_cnt) > + return; > + > + tsk->snt_unacked += tsk->snd_backlog; > + tsk->snd_backlog = 0; > + tsk->expect_ack = true; > + rc = tipc_node_xmit(net, txq, dnode, tsk->portid); > + if (rc == -ELINKCONG) > + tsk->cong_link_cnt = 1; > +} > + > /** > * tipc_sk_conn_proto_rcv - receive a connection mng protocol message > * @tsk: receiving socket > @@ -1221,7 +1274,7 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock > *tsk, struct sk_buff *skb, > u32 onode = tsk_own_node(tsk); > struct sock *sk = &tsk->sk; > int mtyp = msg_type(hdr); > - bool conn_cong; > + bool was_cong; > > /* Ignore if connection cannot be validated: */ > if (!tsk_peer_msg(tsk, hdr)) { > @@ -1254,11 +1307,13 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock > *tsk, struct sk_buff *skb, > __skb_queue_tail(xmitq, skb); > return; > } else if (mtyp == CONN_ACK) { > - conn_cong = tsk_conn_cong(tsk); > + was_cong = tsk_conn_cong(tsk); > + tsk->expect_ack = false; > + tipc_sk_push_backlog(tsk); > tsk->snt_unacked -= msg_conn_ack(hdr); > if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) > tsk->snd_win = msg_adv_win(hdr); > - if (conn_cong) > + if (was_cong && !tsk_conn_cong(tsk)) > sk->sk_write_space(sk); > } else if (mtyp != CONN_PROBE_REPLY) { > pr_warn("Received unknown CONN_PROTO msg\n"); > @@ -1437,16 +1492,17 @@ static int __tipc_sendstream(struct socket *sock, > struct msghdr *m, size_t dlen) > struct sock *sk = sock->sk; > DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); > long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); > + struct sk_buff_head *txq = &sk->sk_write_queue; > struct tipc_sock *tsk = tipc_sk(sk); > struct tipc_msg *hdr = &tsk->phdr; > struct net *net = sock_net(sk); > - struct sk_buff_head pkts; > u32 dnode = tsk_peer_node(tsk); > + int blocks = tsk->snd_backlog; > + int maxnagle = tsk->maxnagle; > + int maxpkt = tsk->max_pkt; > int send, sent = 0; > int rc = 0; > > - __skb_queue_head_init(&pkts); > - > if (unlikely(dlen > INT_MAX)) > return -EMSGSIZE; > > @@ -1467,21 +1523,35 @@ static int __tipc_sendstream(struct socket *sock, > struct msghdr *m, size_t dlen) > tipc_sk_connected(sk))); > if (unlikely(rc)) > break; > - > send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); > - rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); > - if (unlikely(rc != send)) > - break; > > - trace_tipc_sk_sendstream(sk, skb_peek(&pkts), > + if (tsk->oneway++ >= 4 && send <= maxnagle) { > + rc = tipc_msg_append(hdr, m, send, maxnagle, txq); > + if (rc < 0) > + break; > + blocks += rc; > + if (blocks <= 64 && tsk->expect_ack) { > + tsk->snd_backlog = blocks; > + sent += send; > + break; > + } > + tsk->expect_ack = true; > + } else { > + rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq); > + if (unlikely(rc != send)) > + break; > + blocks += tsk_inc(tsk, send + MIN_H_SIZE); > + } > + trace_tipc_sk_sendstream(sk, skb_peek(txq), > TIPC_DUMP_SK_SNDQ, " "); > - rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); > + rc = tipc_node_xmit(net, txq, dnode, tsk->portid); > if (unlikely(rc == -ELINKCONG)) { > tsk->cong_link_cnt = 1; > rc = 0; > } > if (likely(!rc)) { > - tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); > + tsk->snt_unacked += blocks; > + tsk->snd_backlog = 0; > sent += send; > } > } while (sent < dlen && !rc); > @@ -1528,6 +1598,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, > u32 peer_port, > tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); > tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); > tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); > + tsk_set_nagle(tsk); > __skb_queue_purge(&sk->sk_write_queue); > if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) > return; > @@ -1848,6 +1919,7 @@ static int tipc_recvstream(struct socket *sock, struct > msghdr *m, > bool peek = flags & MSG_PEEK; > int offset, required, copy, copied = 0; > int hlen, dlen, err, rc; > + bool ack = false; > long timeout; > > /* Catch invalid receive attempts */ > @@ -1892,6 +1964,7 @@ static int tipc_recvstream(struct socket *sock, struct > msghdr *m, > > /* Copy data if msg ok, otherwise return error/partial data */ > if (likely(!err)) { > + ack = msg_ack_required(hdr); > offset = skb_cb->bytes_read; > copy = min_t(int, dlen - offset, buflen - copied); > rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy); > @@ -1919,7 +1992,7 @@ static int tipc_recvstream(struct socket *sock, struct > msghdr *m, > > /* Send connection flow control advertisement when applicable */ > tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); > - if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)) > + if (ack || tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) > tipc_sk_send_ack(tsk); > > /* Exit if all requested data or FIN/error received */ > @@ -1990,6 +2063,7 @@ static void tipc_sk_proto_rcv(struct sock *sk, > smp_wmb(); > tsk->cong_link_cnt--; > wakeup = true; > + tipc_sk_push_backlog(tsk); > break; > case GROUP_PROTOCOL: > tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq); > @@ -2029,6 +2103,7 @@ static bool tipc_sk_filter_connect(struct tipc_sock > *tsk, struct sk_buff *skb) > > if (unlikely(msg_mcast(hdr))) > return false; > + tsk->oneway = 0; > > switch (sk->sk_state) { > case TIPC_CONNECTING: > @@ -2074,6 +2149,8 @@ static bool tipc_sk_filter_connect(struct tipc_sock > *tsk, struct sk_buff *skb) > return true; > return false; > case TIPC_ESTABLISHED: > + if (!skb_queue_empty(&sk->sk_write_queue)) > + tipc_sk_push_backlog(tsk); > /* Accept only connection-based messages sent by peer */ > if (likely(con_msg && !err && pport == oport && pnode == onode)) > return true; > @@ -2959,6 +3036,7 @@ static int tipc_setsockopt(struct socket *sock, int > lvl, int opt, > case TIPC_SRC_DROPPABLE: > case TIPC_DEST_DROPPABLE: > case TIPC_CONN_TIMEOUT: > + case TIPC_NODELAY: > if (ol < sizeof(value)) > return -EINVAL; > if (get_user(value, (u32 __user *)ov)) > @@ -3007,6 +3085,10 @@ static int tipc_setsockopt(struct socket *sock, int > lvl, int opt, > case TIPC_GROUP_LEAVE: > res = tipc_sk_leave(tsk); > break; > + case TIPC_NODELAY: > + tsk->nodelay = !!value; > + tsk_set_nagle(tsk); > + break; > default: > res = -EINVAL; > } > _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion