Re: [tipc-discussion] [net-next] tipc: improve message bundling algorithm

2019-11-03 Thread David Miller
From: Tuong Lien 
Date: Fri,  1 Nov 2019 09:58:57 +0700

> As mentioned in commit e95584a889e1 ("tipc: fix unlimited bundling of
> small messages"), the current message bundling algorithm is inefficient
> that can generate bundles of only one payload message, that causes
> unnecessary overheads for both the sender and receiver.
> 
> This commit re-designs the 'tipc_msg_make_bundle()' function (now named
> as 'tipc_msg_try_bundle()'), so that when a message comes at the first
> place, we will just check & keep a reference to it if the message is
> suitable for bundling. The message buffer will be put into the link
> backlog queue and processed as normal. Later on, when another one comes
> we will make a bundle with the first message if possible and so on...
> This way, a bundle if really needed will always consist of at least two
> payload messages. Otherwise, we let the first buffer go its way without
> any need of bundling, so reduce the overheads to zero.
> 
> Moreover, since now we have both the messages in hand, we can even
> optimize the 'tipc_msg_bundle()' function, make bundle of a very large
> (size ~ MSS) and small messages which is not with the current algorithm
> e.g. [1400-byte message] + [10-byte message] (MTU = 1500).
> 
> Acked-by: Ying Xue 
> Acked-by: Jon Maloy 
> Signed-off-by: Tuong Lien 

Applied.


___
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion


[tipc-discussion] [net-next] tipc: improve message bundling algorithm

2019-10-31 Thread Tuong Lien
As mentioned in commit e95584a889e1 ("tipc: fix unlimited bundling of
small messages"), the current message bundling algorithm is inefficient
that can generate bundles of only one payload message, that causes
unnecessary overheads for both the sender and receiver.

This commit re-designs the 'tipc_msg_make_bundle()' function (now named
as 'tipc_msg_try_bundle()'), so that when a message comes at the first
place, we will just check & keep a reference to it if the message is
suitable for bundling. The message buffer will be put into the link
backlog queue and processed as normal. Later on, when another one comes
we will make a bundle with the first message if possible and so on...
This way, a bundle if really needed will always consist of at least two
payload messages. Otherwise, we let the first buffer go its way without
any need of bundling, so reduce the overheads to zero.

Moreover, since now we have both the messages in hand, we can even
optimize the 'tipc_msg_bundle()' function, make bundle of a very large
(size ~ MSS) and small messages which is not with the current algorithm
e.g. [1400-byte message] + [10-byte message] (MTU = 1500).

Acked-by: Ying Xue 
Acked-by: Jon Maloy 
Signed-off-by: Tuong Lien 
---
 net/tipc/link.c |  59 +++---
 net/tipc/msg.c  | 153 +---
 net/tipc/msg.h  |   5 +-
 3 files changed, 113 insertions(+), 104 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 7d7a66178607..038861bad72b 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -940,16 +940,17 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
   struct sk_buff_head *xmitq)
 {
struct tipc_msg *hdr = buf_msg(skb_peek(list));
-   unsigned int maxwin = l->window;
-   int imp = msg_importance(hdr);
-   unsigned int mtu = l->mtu;
+   struct sk_buff_head *backlogq = >backlogq;
+   struct sk_buff_head *transmq = >transmq;
+   struct sk_buff *skb, *_skb;
+   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
-   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-   struct sk_buff_head *transmq = >transmq;
-   struct sk_buff_head *backlogq = >backlogq;
-   struct sk_buff *skb, *_skb, **tskb;
int pkt_cnt = skb_queue_len(list);
+   int imp = msg_importance(hdr);
+   unsigned int maxwin = l->window;
+   unsigned int mtu = l->mtu;
+   bool new_bundle;
int rc = 0;
 
if (unlikely(msg_size(hdr) > mtu)) {
@@ -975,20 +976,18 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
}
 
/* Prepare each packet for sending, and add to relevant queue: */
-   while (skb_queue_len(list)) {
-   skb = skb_peek(list);
-   hdr = buf_msg(skb);
-   msg_set_seqno(hdr, seqno);
-   msg_set_ack(hdr, ack);
-   msg_set_bcast_ack(hdr, bc_ack);
-
+   while ((skb = __skb_dequeue(list))) {
if (likely(skb_queue_len(transmq) < maxwin)) {
+   hdr = buf_msg(skb);
+   msg_set_seqno(hdr, seqno);
+   msg_set_ack(hdr, ack);
+   msg_set_bcast_ack(hdr, bc_ack);
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb) {
+   kfree_skb(skb);
__skb_queue_purge(list);
return -ENOBUFS;
}
-   __skb_dequeue(list);
__skb_queue_tail(transmq, skb);
/* next retransmit attempt */
if (link_is_bc_sndlink(l))
@@ -1000,22 +999,26 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
seqno++;
continue;
}
-   tskb = >backlog[imp].target_bskb;
-   if (tipc_msg_bundle(*tskb, hdr, mtu)) {
-   kfree_skb(__skb_dequeue(list));
-   l->stats.sent_bundled++;
-   continue;
-   }
-   if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
-   kfree_skb(__skb_dequeue(list));
-   __skb_queue_tail(backlogq, *tskb);
-   l->backlog[imp].len++;
-   l->stats.sent_bundled++;
-   l->stats.sent_bundles++;
+   if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, ,
+   mtu - INT_H_SIZE, l->addr,
+   _bundle)) {
+   if (skb) {
+   /* Keep a ref. to the skb for next try */
+   l->backlog[imp].target_bskb = skb;
+   l->backlog[imp].len++;
+   

Re: [tipc-discussion] [net-next] tipc: improve message bundling algorithm

2019-10-31 Thread Xue, Ying
Sorry for late response to this commit. Great change!

Acked-by: Ying Xue 

-Original Message-
From: Tuong Lien [mailto:tuong.t.l...@dektech.com.au] 
Sent: Tuesday, October 15, 2019 12:59 PM
To: tipc-discussion@lists.sourceforge.net; jon.ma...@ericsson.com; 
ma...@donjonn.com; Xue, Ying
Subject: [net-next] tipc: improve message bundling algorithm

As mentioned in commit e95584a889e1 ("tipc: fix unlimited bundling of
small messages"), the current message bundling algorithm is inefficient
that can generate bundles of only one payload message, that causes
unnecessary overheads for both the sender and receiver.

This commit re-designs the 'tipc_msg_make_bundle()' function (now named
as 'tipc_msg_try_bundle()'), so that when a message comes at the first
place, we will just check & keep a reference to it if the message is
suitable for bundling. The message buffer will be put into the link
backlog queue and processed as normal. Later on, when another one comes
we will make a bundle with the first message if possible and so on...
This way, a bundle if really needed will always consist of at least two
payload messages. Otherwise, we let the first buffer go its way without
any need of bundling, so reduce the overheads to zero.

Moreover, since now we have both the messages in hand, we can even
optimize the 'tipc_msg_bundle()' function, make bundle of a very large
(size ~ MSS) and small messages which is not with the current algorithm
e.g. [1400-byte message] + [10-byte message] (MTU = 1500).

Signed-off-by: Tuong Lien 
---
 net/tipc/link.c |  60 +++---
 net/tipc/msg.c  | 153 +---
 net/tipc/msg.h  |   5 +-
 3 files changed, 114 insertions(+), 104 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 999eab592de8..3bd60bdbf56c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -940,16 +940,17 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
   struct sk_buff_head *xmitq)
 {
struct tipc_msg *hdr = buf_msg(skb_peek(list));
-   unsigned int maxwin = l->window;
-   int imp = msg_importance(hdr);
-   unsigned int mtu = l->mtu;
+   struct sk_buff_head *backlogq = >backlogq;
+   struct sk_buff_head *transmq = >transmq;
+   struct sk_buff *skb, *_skb;
+   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
-   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-   struct sk_buff_head *transmq = >transmq;
-   struct sk_buff_head *backlogq = >backlogq;
-   struct sk_buff *skb, *_skb, **tskb;
int pkt_cnt = skb_queue_len(list);
+   int imp = msg_importance(hdr);
+   unsigned int maxwin = l->window;
+   unsigned int mtu = l->mtu;
+   bool new_bundle;
int rc = 0;
 
if (unlikely(msg_size(hdr) > mtu)) {
@@ -975,20 +976,18 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
}
 
/* Prepare each packet for sending, and add to relevant queue: */
-   while (skb_queue_len(list)) {
-   skb = skb_peek(list);
-   hdr = buf_msg(skb);
-   msg_set_seqno(hdr, seqno);
-   msg_set_ack(hdr, ack);
-   msg_set_bcast_ack(hdr, bc_ack);
-
+   while ((skb = __skb_dequeue(list))) {
if (likely(skb_queue_len(transmq) < maxwin)) {
+   hdr = buf_msg(skb);
+   msg_set_seqno(hdr, seqno);
+   msg_set_ack(hdr, ack);
+   msg_set_bcast_ack(hdr, bc_ack);
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb) {
+   kfree_skb(skb);
__skb_queue_purge(list);
return -ENOBUFS;
}
-   __skb_dequeue(list);
__skb_queue_tail(transmq, skb);
/* next retransmit attempt */
if (link_is_bc_sndlink(l))
@@ -1000,22 +999,27 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
seqno++;
continue;
}
-   tskb = >backlog[imp].target_bskb;
-   if (tipc_msg_bundle(*tskb, hdr, mtu)) {
-   kfree_skb(__skb_dequeue(list));
-   l->stats.sent_bundled++;
-   continue;
-   }
-   if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
-   kfree_skb(__skb_dequeue(list));
-   __skb_queue_tail(backlogq, *tskb);
-   l->backlog[imp].len++;
-   l->stats.sent_bundled++;
-   l->stats.sent_bundles++;
+   if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, ,
+   mtu - INT_H_SIZE, 

[tipc-discussion] [net-next] tipc: improve message bundling algorithm

2019-10-14 Thread Tuong Lien
As mentioned in commit e95584a889e1 ("tipc: fix unlimited bundling of
small messages"), the current message bundling algorithm is inefficient
that can generate bundles of only one payload message, that causes
unnecessary overheads for both the sender and receiver.

This commit re-designs the 'tipc_msg_make_bundle()' function (now named
as 'tipc_msg_try_bundle()'), so that when a message comes at the first
place, we will just check & keep a reference to it if the message is
suitable for bundling. The message buffer will be put into the link
backlog queue and processed as normal. Later on, when another one comes
we will make a bundle with the first message if possible and so on...
This way, a bundle if really needed will always consist of at least two
payload messages. Otherwise, we let the first buffer go its way without
any need of bundling, so reduce the overheads to zero.

Moreover, since now we have both the messages in hand, we can even
optimize the 'tipc_msg_bundle()' function, make bundle of a very large
(size ~ MSS) and small messages which is not with the current algorithm
e.g. [1400-byte message] + [10-byte message] (MTU = 1500).

Signed-off-by: Tuong Lien 
---
 net/tipc/link.c |  60 +++---
 net/tipc/msg.c  | 153 +---
 net/tipc/msg.h  |   5 +-
 3 files changed, 114 insertions(+), 104 deletions(-)

diff --git a/net/tipc/link.c b/net/tipc/link.c
index 999eab592de8..3bd60bdbf56c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -940,16 +940,17 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
   struct sk_buff_head *xmitq)
 {
struct tipc_msg *hdr = buf_msg(skb_peek(list));
-   unsigned int maxwin = l->window;
-   int imp = msg_importance(hdr);
-   unsigned int mtu = l->mtu;
+   struct sk_buff_head *backlogq = >backlogq;
+   struct sk_buff_head *transmq = >transmq;
+   struct sk_buff *skb, *_skb;
+   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
u16 ack = l->rcv_nxt - 1;
u16 seqno = l->snd_nxt;
-   u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
-   struct sk_buff_head *transmq = >transmq;
-   struct sk_buff_head *backlogq = >backlogq;
-   struct sk_buff *skb, *_skb, **tskb;
int pkt_cnt = skb_queue_len(list);
+   int imp = msg_importance(hdr);
+   unsigned int maxwin = l->window;
+   unsigned int mtu = l->mtu;
+   bool new_bundle;
int rc = 0;
 
if (unlikely(msg_size(hdr) > mtu)) {
@@ -975,20 +976,18 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
}
 
/* Prepare each packet for sending, and add to relevant queue: */
-   while (skb_queue_len(list)) {
-   skb = skb_peek(list);
-   hdr = buf_msg(skb);
-   msg_set_seqno(hdr, seqno);
-   msg_set_ack(hdr, ack);
-   msg_set_bcast_ack(hdr, bc_ack);
-
+   while ((skb = __skb_dequeue(list))) {
if (likely(skb_queue_len(transmq) < maxwin)) {
+   hdr = buf_msg(skb);
+   msg_set_seqno(hdr, seqno);
+   msg_set_ack(hdr, ack);
+   msg_set_bcast_ack(hdr, bc_ack);
_skb = skb_clone(skb, GFP_ATOMIC);
if (!_skb) {
+   kfree_skb(skb);
__skb_queue_purge(list);
return -ENOBUFS;
}
-   __skb_dequeue(list);
__skb_queue_tail(transmq, skb);
/* next retransmit attempt */
if (link_is_bc_sndlink(l))
@@ -1000,22 +999,27 @@ int tipc_link_xmit(struct tipc_link *l, struct 
sk_buff_head *list,
seqno++;
continue;
}
-   tskb = >backlog[imp].target_bskb;
-   if (tipc_msg_bundle(*tskb, hdr, mtu)) {
-   kfree_skb(__skb_dequeue(list));
-   l->stats.sent_bundled++;
-   continue;
-   }
-   if (tipc_msg_make_bundle(tskb, hdr, mtu, l->addr)) {
-   kfree_skb(__skb_dequeue(list));
-   __skb_queue_tail(backlogq, *tskb);
-   l->backlog[imp].len++;
-   l->stats.sent_bundled++;
-   l->stats.sent_bundles++;
+   if (tipc_msg_try_bundle(l->backlog[imp].target_bskb, ,
+   mtu - INT_H_SIZE, l->addr,
+   _bundle)) {
+   if (skb) {
+   /* Keep a ref. to the skb for next try */
+   l->backlog[imp].target_bskb = skb;
+   l->backlog[imp].len++;
+   __skb_queue_tail(backlogq, skb);
+