-----Original Message-----
From: Jon Maloy <jma...@redhat.com>
Sent: Tuesday, March 17, 2020 2:49 AM
To: Tuong Lien Tong <tuong.t.l...@dektech.com.au>;
tipc-discussion@lists.sourceforge.net; ma...@donjonn.com;
ying....@windriver.com; l...@redhat.com
Subject: Re: [tipc-discussion] [PATCH RFC 1/2] tipc: add Gap ACK
blocks support for broadcast link
On 3/16/20 2:18 PM, Jon Maloy wrote:
>
>
> On 3/16/20 7:23 AM, Tuong Lien Tong wrote:
>>
[...]
>>
> The improvement shown here is truly impressive. However, you are only
> showing tipc-pipe with small messages. How does this look when you
> send full-size 66k messages? How does it scale when the number of
> destinations grows up to tens or even hundreds? I am particularly
> concerned that the use of unicast retransmission may become a
> sub-optimization if the number of destinations is large.
>
> ///jon
You should try the "multicast_blast" program under tipc-utils/test. That
will give you
numbers both on throughput and loss rates as you let the number of nodes
grow.
///jon
>
>> BR/Tuong
>>
>> *From:* Jon Maloy <jma...@redhat.com <mailto:jma...@redhat.com>>
>> *Sent:* Friday, March 13, 2020 10:47 PM
>> *To:* Tuong Lien <tuong.t.l...@dektech.com.au
<mailto:tuong.t.l...@dektech.com.au>>;
>> tipc-discussion@lists.sourceforge.net
<mailto:tipc-discussion@lists.sourceforge.net>; ma...@donjonn.com
<mailto:ma...@donjonn.com>;
>> ying....@windriver.com <mailto:ying....@windriver.com>
>> *Subject:* Re: [PATCH RFC 1/2] tipc: add Gap ACK blocks support for
>> broadcast link
>>
>> On 3/13/20 6:47 AM, Tuong Lien wrote:
>>
>> As achieved through commit 9195948fbf34 ("tipc: improve TIPC
>> throughput
>>
>> by Gap ACK blocks"), we apply the same mechanism for the
>> broadcast link
>>
>> as well. The 'Gap ACK blocks' data field in a
>> 'PROTOCOL/STATE_MSG' will
>>
>> consist of two parts built for both the broadcast and unicast
types:
>>
>> 31 16 15 0
>>
>> +-------------+-------------+-------------+-------------+
>>
>> | bgack_cnt | ugack_cnt | len |
>>
>> +-------------+-------------+-------------+-------------+ -
>>
>> | gap | ack | |
>>
>> +-------------+-------------+-------------+-------------+ > bc gacks
>>
>> : : : |
>>
>> +-------------+-------------+-------------+-------------+ -
>>
>> | gap | ack | |
>>
>> +-------------+-------------+-------------+-------------+ > uc gacks
>>
>> : : : |
>>
>> +-------------+-------------+-------------+-------------+ -
>>
>> which is "automatically" backward-compatible.
>>
>> We also increase the max number of Gap ACK blocks to 128,
>> allowing upto
>>
>> 64 blocks per type (total buffer size = 516 bytes).
>>
>> Besides, the 'tipc_link_advance_transmq()' function is refactored
>> which
>>
>> is applicable for both the unicast and broadcast cases now, so
>> some old
>>
>> functions can be removed and the code is optimized.
>>
>> With the patch, TIPC broadcast is more robust regardless of
>> packet loss
>>
>> or disorder, latency, ... in the underlying network. Its
>> performance is
>>
>> boost up significantly.
>>
>> For example, experiment with a 5% packet loss rate results:
>>
>> $ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000
>>
>> real 0m 42.46s
>>
>> user 0m 1.16s
>>
>> sys 0m 17.67s
>>
>> Without the patch:
>>
>> $ time tipc-pipe --mc --rdm --data_size 123 --data_num 1500000
>>
>> real 5m 28.80s
>>
>> user 0m 0.85s
>>
>> sys 0m 3.62s
>>
>> Can you explain this? To me it seems like the elapsed time is reduced
>> with a factor 328.8/42.46=7.7, while we are consuming significantly
>> more CPU to achieve this. Doesn't that mean that we have much more
>> retransmissions which are consuming CPU? Or is there some other
>> explanation?
>>
>> ///jon
>>
>>
>> Signed-off-by: Tuong Lien<tuong.t.l...@dektech.com.au
<mailto:tuong.t.l...@dektech.com.au>>
>> <mailto:tuong.t.l...@dektech.com.au>
>>
>> ---
>>
>> net/tipc/bcast.c | 9 +-
>>
>> net/tipc/link.c | 440
>> +++++++++++++++++++++++++++++++++----------------------
>>
>> net/tipc/link.h | 7 +-
>>
>> net/tipc/msg.h | 14 +-
>>
>> net/tipc/node.c | 10 +-
>>
>> 5 files changed, 295 insertions(+), 185 deletions(-)
>>
>> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
>>
>> index 4c20be08b9c4..3ce690a96ee9 100644
>>
>> --- a/net/tipc/bcast.c
>>
>> +++ b/net/tipc/bcast.c
>>
>> @@ -474,7 +474,7 @@ void tipc_bcast_ack_rcv(struct net *net,
>> struct tipc_link *l,
>>
>> __skb_queue_head_init(&xmitq);
>>
>>
>> tipc_bcast_lock(net);
>>
>> - tipc_link_bc_ack_rcv(l, acked, &xmitq);
>>
>> + tipc_link_bc_ack_rcv(l, acked, 0, NULL, &xmitq);
>>
>> tipc_bcast_unlock(net);
>>
>>
>> tipc_bcbase_xmit(net, &xmitq);
>>
>> @@ -492,6 +492,7 @@ int tipc_bcast_sync_rcv(struct net *net,
>> struct tipc_link *l,
>>
>> struct tipc_msg *hdr)
>>
>> {
>>
>> struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
>>
>> + struct tipc_gap_ack_blks *ga;
>>
>> struct sk_buff_head xmitq;
>>
>> int rc = 0;
>>
>>
>> @@ -501,8 +502,10 @@ int tipc_bcast_sync_rcv(struct net *net,
>> struct tipc_link *l,
>>
>> if (msg_type(hdr) != STATE_MSG) {
>>
>> tipc_link_bc_init_rcv(l, hdr);
>>
>> } else if (!msg_bc_ack_invalid(hdr)) {
>>
>> - tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
>>
>> - rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
>>
>> + tipc_get_gap_ack_blks(&ga, l, hdr, false);
>>
>> + rc = tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr),
>>
>> + msg_bc_gap(hdr), ga, &xmitq);
>>
>> + rc |= tipc_link_bc_sync_rcv(l, hdr, &xmitq);
>>
>> }
>>
>> tipc_bcast_unlock(net);
>>
>>
>> diff --git a/net/tipc/link.c b/net/tipc/link.c
>>
>> index 467c53a1fb5c..6198b6d89a69 100644
>>
>> --- a/net/tipc/link.c
>>
>> +++ b/net/tipc/link.c
>>
>> @@ -188,6 +188,8 @@ struct tipc_link {
>>
>> /* Broadcast */
>>
>> u16 ackers;
>>
>> u16 acked;
>>
>> + u16 last_gap;
>>
>> + struct tipc_gap_ack_blks *last_ga;
>>
>> struct tipc_link *bc_rcvlink;
>>
>> struct tipc_link *bc_sndlink;
>>
>> u8 nack_state;
>>
>> @@ -249,11 +251,14 @@ static int tipc_link_build_nack_msg(struct
>> tipc_link *l,
>>
>> struct sk_buff_head *xmitq);
>>
>> static void tipc_link_build_bc_init_msg(struct tipc_link *l,
>>
>> struct sk_buff_head *xmitq);
>>
>> -static int tipc_link_release_pkts(struct tipc_link *l, u16 to);
>>
>> -static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void
>> *data, u16 gap);
>>
>> -static int tipc_link_advance_transmq(struct tipc_link *l, u16
>> acked, u16 gap,
>>
>> +static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga,
>>
>> + struct tipc_link *l, u8 start_index);
>>
>> +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct
>> tipc_msg *hdr);
>>
>> +static int tipc_link_advance_transmq(struct tipc_link *l, struct
>> tipc_link *r,
>>
>> + u16 acked, u16 gap,
>>
>> struct tipc_gap_ack_blks *ga,
>>
>> - struct sk_buff_head *xmitq);
>>
>> + struct sk_buff_head *xmitq,
>>
>> + bool *retransmitted, int *rc);
>>
>> static void tipc_link_update_cwin(struct tipc_link *l, int
>> released,
>>
>> bool retransmitted);
>>
>> /*
>>
>> @@ -370,7 +375,7 @@ void tipc_link_remove_bc_peer(struct
>> tipc_link *snd_l,
>>
>> snd_l->ackers--;
>>
>> rcv_l->bc_peer_is_up = true;
>>
>> rcv_l->state = LINK_ESTABLISHED;
>>
>> - tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
>>
>> + tipc_link_bc_ack_rcv(rcv_l, ack, 0, NULL, xmitq);
>>
>> trace_tipc_link_reset(rcv_l, TIPC_DUMP_ALL, "bclink removed!");
>>
>> tipc_link_reset(rcv_l);
>>
>> rcv_l->state = LINK_RESET;
>>
>> @@ -784,8 +789,6 @@ bool tipc_link_too_silent(struct tipc_link *l)
>>
>> return (l->silent_intv_cnt + 2 > l->abort_limit);
>>
>> }
>>
>>
>> -static int tipc_link_bc_retrans(struct tipc_link *l, struct
>> tipc_link *r,
>>
>> - u16 from, u16 to, struct sk_buff_head
>> *xmitq);
>>
>> /* tipc_link_timeout - perform periodic task as instructed from
>> node timeout
>>
>> */
>>
>> int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head
>> *xmitq)
>>
>> @@ -948,6 +951,9 @@ void tipc_link_reset(struct tipc_link *l)
>>
>> l->snd_nxt_state = 1;
>>
>> l->rcv_nxt_state = 1;
>>
>> l->acked = 0;
>>
>> + l->last_gap = 0;
>>
>> + kfree(l->last_ga);
>>
>> + l->last_ga = NULL;
>>
>> l->silent_intv_cnt = 0;
>>
>> l->rst_cnt = 0;
>>
>> l->bc_peer_is_up = false;
>>
>> @@ -1183,68 +1189,14 @@ static bool
>> link_retransmit_failure(struct tipc_link *l, struct tipc_link *r,
>>
>>
>> if (link_is_bc_sndlink(l)) {
>>
>> r->state = LINK_RESET;
>>
>> - *rc = TIPC_LINK_DOWN_EVT;
>>
>> + *rc |= TIPC_LINK_DOWN_EVT;
>>
>> } else {
>>
>> - *rc = tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
>>
>> + *rc |= tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
>>
>> }
>>
>>
>> return true;
>>
>> }
>>
>>
>> -/* tipc_link_bc_retrans() - retransmit zero or more packets
>>
>> - * @l: the link to transmit on
>>
>> - * @r: the receiving link ordering the retransmit. Same as l if
>> unicast
>>
>> - * @from: retransmit from (inclusive) this sequence number
>>
>> - * @to: retransmit to (inclusive) this sequence number
>>
>> - * xmitq: queue for accumulating the retransmitted packets
>>
>> - */
>>
>> -static int tipc_link_bc_retrans(struct tipc_link *l, struct
>> tipc_link *r,
>>
>> - u16 from, u16 to, struct sk_buff_head
>> *xmitq)
>>
>> -{
>>
>> - struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
>>
>> - u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
>>
>> - u16 ack = l->rcv_nxt - 1;
>>
>> - int retransmitted = 0;
>>
>> - struct tipc_msg *hdr;
>>
>> - int rc = 0;
>>
>> -
>>
>> - if (!skb)
>>
>> - return 0;
>>
>> - if (less(to, from))
>>
>> - return 0;
>>
>> -
>>
>> - trace_tipc_link_retrans(r, from, to, &l->transmq);
>>
>> -
>>
>> - if (link_retransmit_failure(l, r, &rc))
>>
>> - return rc;
>>
>> -
>>
>> - skb_queue_walk(&l->transmq, skb) {
>>
>> - hdr = buf_msg(skb);
>>
>> - if (less(msg_seqno(hdr), from))
>>
>> - continue;
>>
>> - if (more(msg_seqno(hdr), to))
>>
>> - break;
>>
>> - if (time_before(jiffies, TIPC_SKB_CB(skb)->nxt_retr))
>>
>> - continue;
>>
>> - TIPC_SKB_CB(skb)->nxt_retr = TIPC_BC_RETR_LIM;
>>
>> - _skb = pskb_copy(skb, GFP_ATOMIC);
>>
>> - if (!_skb)
>>
>> - return 0;
>>
>> - hdr = buf_msg(_skb);
>>
>> - msg_set_ack(hdr, ack);
>>
>> - msg_set_bcast_ack(hdr, bc_ack);
>>
>> - _skb->priority = TC_PRIO_CONTROL;
>>
>> - __skb_queue_tail(xmitq, _skb);
>>
>> - l->stats.retransmitted++;
>>
>> - retransmitted++;
>>
>> - /* Increase actual retrans counter & mark first time */
>>
>> - if (!TIPC_SKB_CB(skb)->retr_cnt++)
>>
>> - TIPC_SKB_CB(skb)->retr_stamp = jiffies;
>>
>> - }
>>
>> - tipc_link_update_cwin(l, 0, retransmitted);
>>
>> - return 0;
>>
>> -}
>>
>> -
>>
>> /* tipc_data_input - deliver data and name distr msgs to upper
>> layer
>>
>> *
>>
>> * Consumes buffer if message is of right type
>>
>> @@ -1402,46 +1354,71 @@ static int tipc_link_tnl_rcv(struct
>> tipc_link *l, struct sk_buff *skb,
>>
>> return rc;
>>
>> }
>>
>>
>> -static int tipc_link_release_pkts(struct tipc_link *l, u16 acked)
>>
>> -{
>>
>> - int released = 0;
>>
>> - struct sk_buff *skb, *tmp;
>>
>> -
>>
>> - skb_queue_walk_safe(&l->transmq, skb, tmp) {
>>
>> - if (more(buf_seqno(skb), acked))
>>
>> - break;
>>
>> - __skb_unlink(skb, &l->transmq);
>>
>> - kfree_skb(skb);
>>
>> - released++;
>>
>> +/**
>>
>> + * tipc_get_gap_ack_blks - get Gap ACK blocks from
>> PROTOCOL/STATE_MSG
>>
>> + * @ga: returned pointer to the Gap ACK blocks if any
>>
>> + * @l: the tipc link
>>
>> + * @hdr: the PROTOCOL/STATE_MSG header
>>
>> + * @uc: desired Gap ACK blocks type, i.e. unicast (= 1) or
>> broadcast (= 0)
>>
>> + *
>>
>> + * Return: the total Gap ACK blocks size
>>
>> + */
>>
>> +u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct
>> tipc_link *l,
>>
>> + struct tipc_msg *hdr, bool uc)
>>
>> +{
>>
>> + struct tipc_gap_ack_blks *p;
>>
>> + u16 sz = 0;
>>
>> +
>>
>> + /* Does peer support the Gap ACK blocks feature? */
>>
>> + if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
>>
>> + p = (struct tipc_gap_ack_blks *)msg_data(hdr);
>>
>> + sz = ntohs(p->len);
>>
>> + /* Sanity check */
>>
>> + if (sz == tipc_gap_ack_blks_sz(p->ugack_cnt +
>> p->bgack_cnt)) {
>>
>> + /* Good, check if the desired type exists */
>>
>> + if ((uc && p->ugack_cnt) || (!uc && p->bgack_cnt))
>>
>> + goto ok;
>>
>> + /* Backward compatible: peer might not support bc, but
>> uc? */
>>
>> + } else if (uc && sz ==
>> tipc_gap_ack_blks_sz(p->ugack_cnt)) {
>>
>> + if (p->ugack_cnt) {
>>
>> + p->bgack_cnt = 0;
>>
>> + goto ok;
>>
>> + }
>>
>> + }
>>
>> }
>>
>> - return released;
>>
>> + /* Other cases: ignore! */
>>
>> + p = NULL;
>>
>> +
>>
>> +ok:
>>
>> + *ga = p;
>>
>> + return sz;
>>
>> }
>>
>>
>> -/* tipc_build_gap_ack_blks - build Gap ACK blocks
>>
>> - * @l: tipc link that data have come with gaps in sequence if any
>>
>> - * @data: data buffer to store the Gap ACK blocks after built
>>
>> - *
>>
>> - * returns the actual allocated memory size
>>
>> - */
>>
>> -static u16 tipc_build_gap_ack_blks(struct tipc_link *l, void
>> *data, u16 gap)
>>
>> +static u8 __tipc_build_gap_ack_blks(struct tipc_gap_ack_blks *ga,
>>
>> + struct tipc_link *l, u8 start_index)
>>
>> {
>>
>> + struct tipc_gap_ack *gacks = &ga->gacks[start_index];
>>
>> struct sk_buff *skb = skb_peek(&l->deferdq);
>>
>> - struct tipc_gap_ack_blks *ga = data;
>>
>> - u16 len, expect, seqno = 0;
>>
>> + u16 expect, seqno = 0;
>>
>> u8 n = 0;
>>
>>
>> - if (!skb || !gap)
>>
>> - goto exit;
>>
>> + if (!skb)
>>
>> + return 0;
>>
>>
>> expect = buf_seqno(skb);
>>
>> skb_queue_walk(&l->deferdq, skb) {
>>
>> seqno = buf_seqno(skb);
>>
>> if (unlikely(more(seqno, expect))) {
>>
>> - ga->gacks[n].ack = htons(expect - 1);
>>
>> - ga->gacks[n].gap = htons(seqno - expect);
>>
>> - if (++n >= MAX_GAP_ACK_BLKS) {
>>
>> - pr_info_ratelimited("Too few Gap ACK
>> blocks!\n");
>>
>> - goto exit;
>>
>> + gacks[n].ack = htons(expect - 1);
>>
>> + gacks[n].gap = htons(seqno - expect);
>>
>> + if (++n >= MAX_GAP_ACK_BLKS / 2) {
>>
>> + char buf[TIPC_MAX_LINK_NAME];
>>
>> +
>>
>> + pr_info_ratelimited("Gacks on %s: %d,
>> ql: %d!\n",
>>
>> + tipc_link_name_ext(l, buf),
>>
>> + n,
>>
>> + skb_queue_len(&l->deferdq));
>>
>> + return n;
>>
>> }
>>
>> } else if (unlikely(less(seqno, expect))) {
>>
>> pr_warn("Unexpected skb in deferdq!\n");
>>
>> @@ -1451,14 +1428,57 @@ static u16 tipc_build_gap_ack_blks(struct
>> tipc_link *l, void *data, u16 gap)
>>
>> }
>>
>>
>> /* last block */
>>
>> - ga->gacks[n].ack = htons(seqno);
>>
>> - ga->gacks[n].gap = 0;
>>
>> + gacks[n].ack = htons(seqno);
>>
>> + gacks[n].gap = 0;
>>
>> n++;
>>
>> + return n;
>>
>> +}
>>
>>
>> -exit:
>>
>> - len = tipc_gap_ack_blks_sz(n);
>>
>> +/* tipc_build_gap_ack_blks - build Gap ACK blocks
>>
>> + * @l: tipc unicast link
>>
>> + * @hdr: the tipc message buffer to store the Gap ACK blocks
>> after built
>>
>> + *
>>
>> + * The function builds Gap ACK blocks for both the unicast &
>> broadcast receiver
>>
>> + * links of a certain peer, the buffer after built has the
>> network data format
>>
>> + * as follows:
>>
>> + * 31 16 15 0
>>
>> + * +-------------+-------------+-------------+-------------+
>>
>> + * | bgack_cnt | ugack_cnt | len |
>>
>> + * +-------------+-------------+-------------+-------------+ -
>>
>> + * | gap | ack | |
>>
>> + * +-------------+-------------+-------------+-------------+ >
>> bc gacks
>>
>> + * : : : |
>>
>> + * +-------------+-------------+-------------+-------------+ -
>>
>> + * | gap | ack | |
>>
>> + * +-------------+-------------+-------------+-------------+ >
>> uc gacks
>>
>> + * : : : |
>>
>> + * +-------------+-------------+-------------+-------------+ -
>>
>> + * (See struct tipc_gap_ack_blks)
>>
>> + *
>>
>> + * returns the actual allocated memory size
>>
>> + */
>>
>> +static u16 tipc_build_gap_ack_blks(struct tipc_link *l, struct
>> tipc_msg *hdr)
>>
>> +{
>>
>> + struct tipc_link *bcl = l->bc_rcvlink;
>>
>> + struct tipc_gap_ack_blks *ga;
>>
>> + u16 len;
>>
>> +
>>
>> + ga = (struct tipc_gap_ack_blks *)msg_data(hdr);
>>
>> +
>>
>> + /* Start with broadcast link first */
>>
>> + tipc_bcast_lock(bcl->net);
>>
>> + msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
>>
>> + msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
>>
>> + ga->bgack_cnt = __tipc_build_gap_ack_blks(ga, bcl, 0);
>>
>> + tipc_bcast_unlock(bcl->net);
>>
>> +
>>
>> + /* Now for unicast link, but an explicit NACK only (???) */
>>
>> + ga->ugack_cnt = (msg_seq_gap(hdr)) ?
>>
>> + __tipc_build_gap_ack_blks(ga, l, ga->bgack_cnt)
>> : 0;
>>
>> +
>>
>> + /* Total len */
>>
>> + len = tipc_gap_ack_blks_sz(ga->bgack_cnt + ga->ugack_cnt);
>>
>> ga->len = htons(len);
>>
>> - ga->gack_cnt = n;
>>
>> return len;
>>
>> }
>>
>>
>> @@ -1466,47 +1486,111 @@ static u16
>> tipc_build_gap_ack_blks(struct tipc_link *l, void *data, u16 gap)
>>
>> * acked packets, also doing
>> retransmissions if
>>
>> * gaps found
>>
>> * @l: tipc link with transmq queue to be advanced
>>
>> + * @r: tipc link "receiver" i.e. in case of broadcast (= "l" if
>> unicast)
>>
>> * @acked: seqno of last packet acked by peer without any gaps
>> before
>>
>> * @gap: # of gap packets
>>
>> * @ga: buffer pointer to Gap ACK blocks from peer
>>
>> * @xmitq: queue for accumulating the retransmitted packets
if any
>>
>> + * @retransmitted: returned boolean value if a retransmission is
>> really issued
>>
>> + * @rc: returned code e.g. TIPC_LINK_DOWN_EVT if a repeated
>> retransmit failures
>>
>> + * happens (- unlikely case)
>>
>> *
>>
>> - * In case of a repeated retransmit failures, the call will
>> return shortly
>>
>> - * with a returned code (e.g. TIPC_LINK_DOWN_EVT)
>>
>> + * Return: the number of packets released from the link transmq
>>
>> */
>>
>> -static int tipc_link_advance_transmq(struct tipc_link *l, u16
>> acked, u16 gap,
>>
>> +static int tipc_link_advance_transmq(struct tipc_link *l, struct
>> tipc_link *r,
>>
>> + u16 acked, u16 gap,
>>
>> struct tipc_gap_ack_blks *ga,
>>
>> - struct sk_buff_head *xmitq)
>>
>> + struct sk_buff_head *xmitq,
>>
>> + bool *retransmitted, int *rc)
>>
>> {
>>
>> + struct tipc_gap_ack_blks *last_ga = r->last_ga, *this_ga = NULL;
>>
>> + struct tipc_gap_ack *gacks = NULL;
>>
>> struct sk_buff *skb, *_skb, *tmp;
>>
>> struct tipc_msg *hdr;
>>
>> + u32 qlen = skb_queue_len(&l->transmq);
>>
>> + u16 nacked = acked, ngap = gap, gack_cnt = 0;
>>
>> u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
>>
>> - bool retransmitted = false;
>>
>> u16 ack = l->rcv_nxt - 1;
>>
>> - bool passed = false;
>>
>> - u16 released = 0;
>>
>> u16 seqno, n = 0;
>>
>> - int rc = 0;
>>
>> + u16 end = r->acked, start = end, offset = r->last_gap;
>>
>> + u16 si = (last_ga) ? last_ga->start_index : 0;
>>
>> + bool is_uc = !link_is_bc_sndlink(l);
>>
>> + bool bc_has_acked = false;
>>
>> +
>>
>> + trace_tipc_link_retrans(r, acked + 1, acked + gap, &l->transmq);
>>
>> +
>>
>> + /* Determine Gap ACK blocks if any for the particular link */
>>
>> + if (ga && is_uc) {
>>
>> + /* Get the Gap ACKs, uc part */
>>
>> + gack_cnt = ga->ugack_cnt;
>>
>> + gacks = &ga->gacks[ga->bgack_cnt];
>>
>> + } else if (ga) {
>>
>> + /* Copy the Gap ACKs, bc part, for later renewal if
>> needed */
>>
>> + this_ga = kmemdup(ga, tipc_gap_ack_blks_sz(ga->bgack_cnt),
>>
>> + GFP_ATOMIC);
>>
>> + if (likely(this_ga)) {
>>
>> + this_ga->start_index = 0;
>>
>> + /* Start with the bc Gap ACKs */
>>
>> + gack_cnt = this_ga->bgack_cnt;
>>
>> + gacks = &this_ga->gacks[0];
>>
>> + } else {
>>
>> + /* Hmm, we can get in trouble..., simply ignore
>> it */
>>
>> + pr_warn_ratelimited("Ignoring bc Gap ACKs, no
>> memory\n");
>>
>> + }
>>
>> + }
>>
>>
>> + /* Advance the link transmq */
>>
>> skb_queue_walk_safe(&l->transmq, skb, tmp) {
>>
>> seqno = buf_seqno(skb);
>>
>>
>> next_gap_ack:
>>
>> - if (less_eq(seqno, acked)) {
>>
>> + if (less_eq(seqno, nacked)) {
>>
>> + if (is_uc)
>>
>> + goto release;
>>
>> + /* Skip packets peer has already acked */
>>
>> + if (!more(seqno, r->acked))
>>
>> + continue;
>>
>> + /* Get the next of last Gap ACK blocks */
>>
>> + while (more(seqno, end)) {
>>
>> + if (!last_ga || si >= last_ga->bgack_cnt)
>>
>> + break;
>>
>> + start = end + offset + 1;
>>
>> + end = ntohs(last_ga->gacks[si].ack);
>>
>> + offset = ntohs(last_ga->gacks[si].gap);
>>
>> + si++;
>>
>> + WARN_ONCE(more(start, end) ||
>>
>> + (!offset &&
>>
>> + si < last_ga->bgack_cnt) ||
>>
>> + si > MAX_GAP_ACK_BLKS,
>>
>> + "Corrupted Gap ACK: %d %d %d %d
>> %d\n",
>>
>> + start, end, offset, si,
>>
>> + last_ga->bgack_cnt);
>>
>> + }
>>
>> + /* Check against the last Gap ACK block */
>>
>> + if (in_range(seqno, start, end))
>>
>> + continue;
>>
>> + /* Update/release the packet peer is acking */
>>
>> + bc_has_acked = true;
>>
>> + if (--TIPC_SKB_CB(skb)->ackers)
>>
>> + continue;
>>
>> +release:
>>
>> /* release skb */
>>
>> __skb_unlink(skb, &l->transmq);
>>
>> kfree_skb(skb);
>>
>> - released++;
>>
>> - } else if (less_eq(seqno, acked + gap)) {
>>
>> - /* First, check if repeated retrans failures
>> occurs? */
>>
>> - if (!passed && link_retransmit_failure(l, l, &rc))
>>
>> - return rc;
>>
>> - passed = true;
>>
>> -
>>
>> + } else if (less_eq(seqno, nacked + ngap)) {
>>
>> + /* First gap: check if repeated retrans
>> failures? */
>>
>> + if (unlikely(seqno == acked + 1 &&
>>
>> + link_retransmit_failure(l, r, rc))) {
>>
>> + /* Ignore this bc Gap ACKs if any */
>>
>> + kfree(this_ga);
>>
>> + this_ga = NULL;
>>
>> + break;
>>
>> + }
>>
>> /* retransmit skb if unrestricted*/
>>
>> if (time_before(jiffies,
>> TIPC_SKB_CB(skb)->nxt_retr))
>>
>> �� continue;
>>
>> - TIPC_SKB_CB(skb)->nxt_retr = TIPC_UC_RETR_TIME;
>>
>> + TIPC_SKB_CB(skb)->nxt_retr = (is_uc) ?
>>
>> + TIPC_UC_RETR_TIME :
>> TIPC_BC_RETR_LIM;
>>
>> _skb = pskb_copy(skb, GFP_ATOMIC);
>>
>> if (!_skb)
>>
>> continue;
>>
>> @@ -1516,25 +1600,50 @@ static int
>> tipc_link_advance_transmq(struct tipc_link *l, u16 acked, u16 gap,
>>
>> _skb->priority = TC_PRIO_CONTROL;
>>
>> __skb_queue_tail(xmitq, _skb);
>>
>> l->stats.retransmitted++;
>>
>> - retransmitted = true;
>>
>> + *retransmitted = true;
>>
>> /* Increase actual retrans counter & mark first
>> time */
>>
>> if (!TIPC_SKB_CB(skb)->retr_cnt++)
>>
>> TIPC_SKB_CB(skb)->retr_stamp = jiffies;
>>
>> } else {
>>
>> /* retry with Gap ACK blocks if any */
>>
>> - if (!ga || n >= ga->gack_cnt)
>>
>> + if (n >= gack_cnt)
>>
>> break;
>>
>> - acked = ntohs(ga->gacks[n].ack);
>>
>> - gap = ntohs(ga->gacks[n].gap);
>>
>> + nacked = ntohs(gacks[n].ack);
>>
>> + ngap = ntohs(gacks[n].gap);
>>
>> n++;
>>
>> goto next_gap_ack;
>>
>> }
>>
>> }
>>
>> - if (released || retransmitted)
>>
>> - tipc_link_update_cwin(l, released, retransmitted);
>>
>> - if (released)
>>
>> - tipc_link_advance_backlog(l, xmitq);
>>
>> - return 0;
>>
>> +
>>
>> + /* Renew last Gap ACK blocks for bc if needed */
>>
>> + if (bc_has_acked) {
>>
>> + if (this_ga) {
>>
>> + kfree(last_ga);
>>
>> + r->last_ga = this_ga;
>>
>> + r->last_gap = gap;
>>
>> + } else if (last_ga) {
>>
>> + if (less(acked, start)) {
>>
>> + si--;
>>
>> + offset = start - acked - 1;
>>
>> + } else if (less(acked, end)) {
>>
>> + acked = end;
>>
>> + }
>>
>> + if (si < last_ga->bgack_cnt) {
>>
>> + last_ga->start_index = si;
>>
>> + r->last_gap = offset;
>>
>> + } else {
>>
>> + kfree(last_ga);
>>
>> + r->last_ga = NULL;
>>
>> + r->last_gap = 0;
>>
>> + }
>>
>> + } else {
>>
>> + r->last_gap = 0;
>>
>> + }
>>
>> + r->acked = acked;
>>
>> + } else {
>>
>> + kfree(this_ga);
>>
>> + }
>>
>> + return skb_queue_len(&l->transmq) - qlen;
>>
>> }
>>
>>
>> /* tipc_link_build_state_msg: prepare link state message for
>> transmission
>>
>> @@ -1651,7 +1760,8 @@ int tipc_link_rcv(struct tipc_link *l,
>> struct sk_buff *skb,
>>
>> kfree_skb(skb);
>>
>> break;
>>
>> }
>>
>> - released += tipc_link_release_pkts(l, msg_ack(hdr));
>>
>> + released += tipc_link_advance_transmq(l, l,
>> msg_ack(hdr), 0,
>>
>> + NULL, NULL, NULL,
>> NULL);
>>
>>
>> /* Defer delivery if sequence gap */
>>
>> if (unlikely(seqno != rcv_nxt)) {
>>
>> @@ -1739,7 +1849,7 @@ static void
>> tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
>>
>> msg_set_probe(hdr, probe);
>>
>> msg_set_is_keepalive(hdr, probe || probe_reply);
>>
>> if (l->peer_caps & TIPC_GAP_ACK_BLOCK)
>>
>> - glen = tipc_build_gap_ack_blks(l, data, rcvgap);
>>
>> + glen = tipc_build_gap_ack_blks(l, hdr);
>>
>> tipc_mon_prep(l->net, data + glen, &dlen, mstate,
>> l->bearer_id);
>>
>> msg_set_size(hdr, INT_H_SIZE + glen + dlen);
>>
>> skb_trim(skb, INT_H_SIZE + glen + dlen);
>>
>> @@ -2027,20 +2137,19 @@ static int tipc_link_proto_rcv(struct
>> tipc_link *l, struct sk_buff *skb,
>>
>> {
>>
>> struct tipc_msg *hdr = buf_msg(skb);
>>
>> struct tipc_gap_ack_blks *ga = NULL;
>>
>> - u16 rcvgap = 0;
>>
>> - u16 ack = msg_ack(hdr);
>>
>> - u16 gap = msg_seq_gap(hdr);
>>
>> + bool reply = msg_probe(hdr), retransmitted = false;
>>
>> + u16 dlen = msg_data_sz(hdr), glen = 0;
>>
>> u16 peers_snd_nxt = msg_next_sent(hdr);
>>
>> u16 peers_tol = msg_link_tolerance(hdr);
>>
>> u16 peers_prio = msg_linkprio(hdr);
>>
>> + u16 gap = msg_seq_gap(hdr);
>>
>> + u16 ack = msg_ack(hdr);
>>
>> u16 rcv_nxt = l->rcv_nxt;
>>
>> - u16 dlen = msg_data_sz(hdr);
>>
>> + u16 rcvgap = 0;
>>
>> int mtyp = msg_type(hdr);
>>
>> - bool reply = msg_probe(hdr);
>>
>> - u16 glen = 0;
>>
>> - void *data;
>>
>> + int rc = 0, released;
>>
>> char *if_name;
>>
>> - int rc = 0;
>>
>> + void *data;
>>
>>
>> trace_tipc_proto_rcv(skb, false, l->name);
>>
>> if (tipc_link_is_blocked(l) || !xmitq)
>>
>> @@ -2137,13 +2246,7 @@ static int tipc_link_proto_rcv(struct
>> tipc_link *l, struct sk_buff *skb,
>>
>> }
>>
>>
>> /* Receive Gap ACK blocks from peer if any */
>>
>> - if (l->peer_caps & TIPC_GAP_ACK_BLOCK) {
>>
>> - ga = (struct tipc_gap_ack_blks *)data;
>>
>> - glen = ntohs(ga->len);
>>
>> - /* sanity check: if failed, ignore Gap ACK
>> blocks */
>>
>> - if (glen != tipc_gap_ack_blks_sz(ga->gack_cnt))
>>
>> - ga = NULL;
>>
>> - }
>>
>> + glen = tipc_get_gap_ack_blks(&ga, l, hdr, true);
>>
>>
>> tipc_mon_rcv(l->net, data + glen, dlen - glen, l->addr,
>>
>> &l->mon_state, l->bearer_id);
>>
>> @@ -2158,9 +2261,14 @@ static int tipc_link_proto_rcv(struct
>> tipc_link *l, struct sk_buff *skb,
>>
>> tipc_link_build_proto_msg(l, STATE_MSG, 0, reply,
>>
>> rcvgap, 0, 0, xmitq);
>>
>>
>> - rc |= tipc_link_advance_transmq(l, ack, gap, ga, xmitq);
>>
>> + released = tipc_link_advance_transmq(l, l, ack, gap, ga,
>> xmitq,
>>
>> + &retransmitted, &rc);
>>
>> if (gap)
>>
>> l->stats.recv_nacks++;
>>
>> + if (released || retransmitted)
>>
>> + tipc_link_update_cwin(l, released, retransmitted);
>>
>> + if (released)
>>
>> + tipc_link_advance_backlog(l, xmitq);
>>
>> if (unlikely(!skb_queue_empty(&l->wakeupq)))
>>
>> link_prepare_wakeup(l);
>>
>> }
>>
>> @@ -2246,10 +2354,7 @@ void tipc_link_bc_init_rcv(struct
>> tipc_link *l, struct tipc_msg *hdr)
>>
>> int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg
>> *hdr,
>>
>> struct sk_buff_head *xmitq)
>>
>> {
>>
>> - struct tipc_link *snd_l = l->bc_sndlink;
>>
>> u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
>>
>> - u16 from = msg_bcast_ack(hdr) + 1;
>>
>> - u16 to = from + msg_bc_gap(hdr) - 1;
>>
>> int rc = 0;
>>
>>
>> if (!link_is_up(l))
>>
>> @@ -2271,8 +2376,6 @@ int tipc_link_bc_sync_rcv(struct tipc_link
>> *l, struct tipc_msg *hdr,
>>
>> if (more(peers_snd_nxt, l->rcv_nxt + l->window))
>>
>> return rc;
>>
>>
>> - rc = tipc_link_bc_retrans(snd_l, l, from, to, xmitq);
>>
>> -
>>
>> l->snd_nxt = peers_snd_nxt;
>>
>> if (link_bc_rcv_gap(l))
>>
>> rc |= TIPC_LINK_SND_STATE;
>>
>> @@ -2307,38 +2410,28 @@ int tipc_link_bc_sync_rcv(struct
>> tipc_link *l, struct tipc_msg *hdr,
>>
>> return 0;
>>
>> }
>>
>>
>> -void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
>>
>> - struct sk_buff_head *xmitq)
>>
>> +int tipc_link_bc_ack_rcv(struct tipc_link *r, u16 acked, u16 gap,
>>
>> + struct tipc_gap_ack_blks *ga,
>>
>> + struct sk_buff_head *xmitq)
>>
>> {
>>
>> - struct sk_buff *skb, *tmp;
>>
>> - struct tipc_link *snd_l = l->bc_sndlink;
>>
>> + struct tipc_link *l = r->bc_sndlink;
>>
>> + bool unused = false;
>>
>> + int rc = 0;
>>
>>
>> - if (!link_is_up(l) || !l->bc_peer_is_up)
>>
>> - return;
>>
>> + if (!link_is_up(r) || !r->bc_peer_is_up)
>>
>> + return 0;
>>
>>
>> - if (!more(acked, l->acked))
>>
>> - return;
>>
>> + if (less(acked, r->acked) || (acked == r->acked && !gap && !ga))
>>
>> + return 0;
>>
>>
>> - trace_tipc_link_bc_ack(l, l->acked, acked, &snd_l->transmq);
>>
>> - /* Skip over packets peer has already acked */
>>
>> - skb_queue_walk(&snd_l->transmq, skb) {
>>
>> - if (more(buf_seqno(skb), l->acked))
>>
>> - break;
>>
>> - }
>>
>> + trace_tipc_link_bc_ack(r, r->acked, acked, &l->transmq);
>>
>> + tipc_link_advance_transmq(l, r, acked, gap, ga, xmitq, &unused,
>> &rc);
>>
>>
>> - /* Update/release the packets peer is acking now */
>>
>> - skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
>>
>> - if (more(buf_seqno(skb), acked))
>>
>> - break;
>>
>> - if (!--TIPC_SKB_CB(skb)->ackers) {
>>
>> - __skb_unlink(skb, &snd_l->transmq);
>>
>> - kfree_skb(skb);
>>
>> - }
>>
>> - }
>>
>> - l->acked = acked;
>>
>> - tipc_link_advance_backlog(snd_l, xmitq);
>>
>> - if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
>>
>> - link_prepare_wakeup(snd_l);
>>
>> + tipc_link_advance_backlog(l, xmitq);
>>
>> + if (unlikely(!skb_queue_empty(&l->wakeupq)))
>>
>> + link_prepare_wakeup(l);
>>
>> +
>>
>> + return rc;
>>
>> }
>>
>>
>> /* tipc_link_bc_nack_rcv(): receive broadcast nack message
>>
>> @@ -2366,8 +2459,7 @@ int tipc_link_bc_nack_rcv(struct tipc_link
>> *l, struct sk_buff *skb,
>>
>> return 0;
>>
>>
>> if (dnode == tipc_own_addr(l->net)) {
>>
>> - tipc_link_bc_ack_rcv(l, acked, xmitq);
>>
>> - rc = tipc_link_bc_retrans(l->bc_sndlink, l, from, to,
>> xmitq);
>>
>> + rc = tipc_link_bc_ack_rcv(l, acked, to - acked, NULL,
>> xmitq);
>>
>> l->stats.recv_nacks++;
>>
>> return rc;
>>
>> }
>>
>> diff --git a/net/tipc/link.h b/net/tipc/link.h
>>
>> index d3c1c3fc1659..0a0fa7350722 100644
>>
>> --- a/net/tipc/link.h
>>
>> +++ b/net/tipc/link.h
>>
>> @@ -143,8 +143,11 @@ int tipc_link_bc_peers(struct tipc_link *l);
>>
>> void tipc_link_set_mtu(struct tipc_link *l, int mtu);
>>
>> int tipc_link_mtu(struct tipc_link *l);
>>
>> int tipc_link_mss(struct tipc_link *l);
>>
>> -void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
>>
>> - struct sk_buff_head *xmitq);
>>
>> +u16 tipc_get_gap_ack_blks(struct tipc_gap_ack_blks **ga, struct
>> tipc_link *l,
>>
>> + struct tipc_msg *hdr, bool uc);
>>
>> +int tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked, u16 gap,
>>
>> + struct tipc_gap_ack_blks *ga,
>>
>> + struct sk_buff_head *xmitq);
>>
>> void tipc_link_build_bc_sync_msg(struct tipc_link *l,
>>
>> struct sk_buff_head *xmitq);
>>
>> void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg
>> *hdr);
>>
>> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
>>
>> index 6d466ebdb64f..9a38f9c9d6eb 100644
>>
>> --- a/net/tipc/msg.h
>>
>> +++ b/net/tipc/msg.h
>>
>> @@ -160,20 +160,26 @@ struct tipc_gap_ack {
>>
>>
>> /* struct tipc_gap_ack_blks
>>
>> * @len: actual length of the record
>>
>> - * @gack_cnt: number of Gap ACK blocks in the record
>>
>> + * @bgack_cnt: number of Gap ACK blocks for broadcast in the
record
>>
>> + * @ugack_cnt: number of Gap ACK blocks for unicast (following
>> the broadcast
>>
>> + * ones)
>>
>> + * @start_index: starting index for "valid" broadcast Gap ACK
>> blocks
>>
>> * @gacks: array of Gap ACK blocks
>>
>> */
>>
>> struct tipc_gap_ack_blks {
>>
>> __be16 len;
>>
>> - u8 gack_cnt;
>>
>> - u8 reserved;
>>
>> + union {
>>
>> + u8 ugack_cnt;
>>
>> + u8 start_index;
>>
>> + };
>>
>> + u8 bgack_cnt;
>>
>> struct tipc_gap_ack gacks[];
>>
>> };
>>
>>
>> #define tipc_gap_ack_blks_sz(n) (sizeof(struct
>> tipc_gap_ack_blks) + \
>>
>> sizeof(struct tipc_gap_ack) * (n))
>>
>>
>> -#define MAX_GAP_ACK_BLKS 32
>>
>> +#define MAX_GAP_ACK_BLKS 128
>>
>> #define MAX_GAP_ACK_BLKS_SZ ��
>> tipc_gap_ack_blks_sz(MAX_GAP_ACK_BLKS)
>>
>>
>> static inline struct tipc_msg *buf_msg(struct sk_buff *skb)
>>
>> diff --git a/net/tipc/node.c b/net/tipc/node.c
>>
>> index 0c88778c88b5..eb6b62de81a7 100644
>>
>> --- a/net/tipc/node.c
>>
>> +++ b/net/tipc/node.c
>>
>> @@ -2069,10 +2069,16 @@ void tipc_rcv(struct net *net, struct
>> sk_buff *skb, struct tipc_bearer *b)
>>
>> le = &n->links[bearer_id];
>>
>>
>> /* Ensure broadcast reception is in synch with peer's send
>> state */
>>
>> - if (unlikely(usr == LINK_PROTOCOL))
>>
>> + if (unlikely(usr == LINK_PROTOCOL)) {
>>
>> + if (unlikely(skb_linearize(skb))) {
>>
>> + tipc_node_put(n);
>>
>> + goto discard;
>>
>> + }
>>
>> + hdr = buf_msg(skb);
>>
>> tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
>>
>> - else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
>>
>> + } else if (unlikely(tipc_link_acked(n->bc_entry.link) !=
>> bc_ack)) {
>>
>> tipc_bcast_ack_rcv(net, n->bc_entry.link, hdr);
>>
>> + }
>>
>>
>> /* Receive packet directly if conditions permit */
>>
>> tipc_node_read_lock(n);
>>
>> --
>>
>
>
> _______________________________________________
> tipc-discussion mailing list
> tipc-discussion@lists.sourceforge.net
<mailto:tipc-discussion@lists.sourceforge.net>
> https://lists.sourceforge.net/lists/listinfo/tipc-discussion
>