Thanks Jon! Hoang -----Original Message----- From: Jon Maloy <jma...@redhat.com> Sent: Monday, June 8, 2020 8:57 AM To: Hoang Huu Le <hoang.h...@dektech.com.au>; tipc-discussion@lists.sourceforge.net; tipc-dek <tipc-...@dektech.com.au> Subject: Re: [tipc-discussion] [net-next] tipc: update a binding service via broadcast
In case you are not aware of this, check this link before you send anything to net-next. http://vger.kernel.org/~davem/net-next.html ///jon On 6/7/20 9:50 PM, Hoang Huu Le wrote: > > -----Original Message----- > From: Jon Maloy <jma...@redhat.com> > Sent: Monday, June 8, 2020 2:14 AM > To: tipc-discussion@lists.sourceforge.net > Subject: Re: [tipc-discussion] [net-next] tipc: update a binding service via > broadcast > > > > On 6/7/20 3:03 PM, Jon Maloy wrote: >> >> On 6/7/20 12:24 AM, Hoang Huu Le wrote: >>> Currently, updating binding table (add service binding to >>> name table/withdraw a service binding) is being sent over replicast. >>> However, if we are scaling up clusters to > 100 nodes/containers this >>> method is less affection because of looping through nodes in a >>> cluster one >>> by one. >>> >>> It is worth to use broadcast to update a binding service. This way, the >>> binding table can be updated on all peer nodes in one shot. >>> >>> Broadcast is used when all peer nodes, as indicated by a new capability >>> flag TIPC_NAMED_BCAST, support reception of this message type. >>> >>> Four problems need to be considered when introducing this feature. >>> 1) When establishing a link to a new peer node we still update this by a >>> unicast 'bulk' update. This may lead to race conditions, where a later >>> broadcast publication/withdrawal bypass the 'bulk', resulting in >>> disordered publications, or even that a withdrawal may arrive before the >>> corresponding publication. We solve this by adding an 'is_last_bulk' bit >>> in the last bulk messages so that it can be distinguished from all other >>> messages. Only when this message has arrived do we open up for reception >>> of broadcast publications/withdrawals. >> Add a line feed between these paragraphs before you send the patch. >> Otherwise, still acked by me. >> >> ///jon > Oh, already posted... Just ignore my comment above. > [Hoang] net-next is closed. I will re-post the patch later with your > suggestion. > > ///jon >>> 2) When a first legacy node is added to the cluster all distribution >>> will switch over to use the legacy 'replicast' method, while the >>> opposite happens when the last legacy node leaves the cluster. This >>> entails another risk of message disordering that has to be handled. We >>> solve this by adding a sequence number to the broadcast/replicast >>> messages, so that disordering can be discovered and corrected. Note >>> however that we don't need to consider potential message loss or >>> duplication at this protocol level. >>> 3) Bulk messages don't contain any sequence numbers, and will always >>> arrive in order. Hence we must exempt those from the sequence number >>> control and deliver them unconditionally. We solve this by adding a new >>> 'is_bulk' bit in those messages so that they can be recognized. >>> 4) Legacy messages, which don't contain any new bits or sequence >>> numbers, but neither can arrive out of order, also need to be exempt >>> from the initial synchronization and sequence number check, and >>> delivered unconditionally. Therefore, we add another 'is_not_legacy' bit >>> to all new messages so that those can be distinguished from legacy >>> messages and the latter delivered directly. >>> >>> Signed-off-by: Hoang Huu Le <hoang.h...@dektech.com.au> >>> Acked-by: Jon Maloy <jma...@redhat.com> >>> --- >>> net/tipc/bcast.c | 6 +-- >>> net/tipc/bcast.h | 4 +- >>> net/tipc/link.c | 2 +- >>> net/tipc/msg.h | 40 ++++++++++++++++ >>> net/tipc/name_distr.c | 109 +++++++++++++++++++++++++++++++----------- >>> net/tipc/name_distr.h | 9 ++-- >>> net/tipc/name_table.c | 9 +++- >>> net/tipc/name_table.h | 2 + >>> net/tipc/node.c | 29 ++++++++--- >>> net/tipc/node.h | 8 ++-- >>> 10 files changed, 170 insertions(+), 48 deletions(-) >>> >>> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c >>> index 383f87bc1061..940d176e0e87 100644 >>> --- a/net/tipc/bcast.c >>> +++ b/net/tipc/bcast.c >>> @@ -250,8 +250,8 @@ static void tipc_bcast_select_xmit_method(struct >>> net *net, int dests, >>> * Consumes the buffer chain. >>> * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE >>> */ >>> -static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, >>> - u16 *cong_link_cnt) >>> +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, >>> + u16 *cong_link_cnt) >>> { >>> struct tipc_link *l = tipc_bc_sndlink(net); >>> struct sk_buff_head xmitq; >>> @@ -752,7 +752,7 @@ void tipc_nlist_purge(struct tipc_nlist *nl) >>> nl->local = false; >>> } >>> -u32 tipc_bcast_get_broadcast_mode(struct net *net) >>> +u32 tipc_bcast_get_mode(struct net *net) >>> { >>> struct tipc_bc_base *bb = tipc_bc_base(net); >>> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h >>> index 4240c95188b1..2d9352dc7b0e 100644 >>> --- a/net/tipc/bcast.h >>> +++ b/net/tipc/bcast.h >>> @@ -90,6 +90,8 @@ void tipc_bcast_toggle_rcast(struct net *net, bool >>> supp); >>> int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, >>> struct tipc_mc_method *method, struct tipc_nlist *dests, >>> u16 *cong_link_cnt); >>> +int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, >>> + u16 *cong_link_cnt); >>> int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct >>> sk_buff *skb); >>> void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, >>> struct tipc_msg *hdr); >>> @@ -101,7 +103,7 @@ int tipc_nl_add_bc_link(struct net *net, struct >>> tipc_nl_msg *msg, >>> int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); >>> int tipc_bclink_reset_stats(struct net *net, struct tipc_link *l); >>> -u32 tipc_bcast_get_broadcast_mode(struct net *net); >>> +u32 tipc_bcast_get_mode(struct net *net); >>> u32 tipc_bcast_get_broadcast_ratio(struct net *net); >>> void tipc_mcast_filter_msg(struct net *net, struct sk_buff_head >>> *defq, >>> diff --git a/net/tipc/link.c b/net/tipc/link.c >>> index ee3b8d0576b8..eac89a3e22ce 100644 >>> --- a/net/tipc/link.c >>> +++ b/net/tipc/link.c >>> @@ -2745,7 +2745,7 @@ int tipc_nl_add_bc_link(struct net *net, struct >>> tipc_nl_msg *msg, >>> void *hdr; >>> struct nlattr *attrs; >>> struct nlattr *prop; >>> - u32 bc_mode = tipc_bcast_get_broadcast_mode(net); >>> + u32 bc_mode = tipc_bcast_get_mode(net); >>> u32 bc_ratio = tipc_bcast_get_broadcast_ratio(net); >>> if (!bcl) >>> diff --git a/net/tipc/msg.h b/net/tipc/msg.h >>> index 58660d56bc83..65119e81ff0c 100644 >>> --- a/net/tipc/msg.h >>> +++ b/net/tipc/msg.h >>> @@ -438,6 +438,36 @@ static inline void msg_set_errcode(struct >>> tipc_msg *m, u32 err) >>> msg_set_bits(m, 1, 25, 0xf, err); >>> } >>> +static inline void msg_set_bulk(struct tipc_msg *m) >>> +{ >>> + msg_set_bits(m, 1, 28, 0x1, 1); >>> +} >>> + >>> +static inline u32 msg_is_bulk(struct tipc_msg *m) >>> +{ >>> + return msg_bits(m, 1, 28, 0x1); >>> +} >>> + >>> +static inline void msg_set_last_bulk(struct tipc_msg *m) >>> +{ >>> + msg_set_bits(m, 1, 27, 0x1, 1); >>> +} >>> + >>> +static inline u32 msg_is_last_bulk(struct tipc_msg *m) >>> +{ >>> + return msg_bits(m, 1, 27, 0x1); >>> +} >>> + >>> +static inline void msg_set_non_legacy(struct tipc_msg *m) >>> +{ >>> + msg_set_bits(m, 1, 26, 0x1, 1); >>> +} >>> + >>> +static inline u32 msg_is_legacy(struct tipc_msg *m) >>> +{ >>> + return !msg_bits(m, 1, 26, 0x1); >>> +} >>> + >>> static inline u32 msg_reroute_cnt(struct tipc_msg *m) >>> { >>> return msg_bits(m, 1, 21, 0xf); >>> @@ -567,6 +597,16 @@ static inline void msg_set_origport(struct >>> tipc_msg *m, u32 p) >>> msg_set_word(m, 4, p); >>> } >>> +static inline u16 msg_named_seqno(struct tipc_msg *m) >>> +{ >>> + return msg_bits(m, 4, 0, 0xffff); >>> +} >>> + >>> +static inline void msg_set_named_seqno(struct tipc_msg *m, u16 n) >>> +{ >>> + msg_set_bits(m, 4, 0, 0xffff, n); >>> +} >>> + >>> static inline u32 msg_destport(struct tipc_msg *m) >>> { >>> return msg_word(m, 5); >>> diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c >>> index 5feaf3b67380..481d480609f0 100644 >>> --- a/net/tipc/name_distr.c >>> +++ b/net/tipc/name_distr.c >>> @@ -102,7 +102,8 @@ struct sk_buff *tipc_named_publish(struct net >>> *net, struct publication *publ) >>> pr_warn("Publication distribution failure\n"); >>> return NULL; >>> } >>> - >>> + msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++); >>> + msg_set_non_legacy(buf_msg(skb)); >>> item = (struct distr_item *)msg_data(buf_msg(skb)); >>> publ_to_item(item, publ); >>> return skb; >>> @@ -114,8 +115,8 @@ struct sk_buff *tipc_named_publish(struct net >>> *net, struct publication *publ) >>> struct sk_buff *tipc_named_withdraw(struct net *net, struct >>> publication *publ) >>> { >>> struct name_table *nt = tipc_name_table(net); >>> - struct sk_buff *buf; >>> struct distr_item *item; >>> + struct sk_buff *skb; >>> write_lock_bh(&nt->cluster_scope_lock); >>> list_del(&publ->binding_node); >>> @@ -123,15 +124,16 @@ struct sk_buff *tipc_named_withdraw(struct net >>> *net, struct publication *publ) >>> if (publ->scope == TIPC_NODE_SCOPE) >>> return NULL; >>> - buf = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0); >>> - if (!buf) { >>> + skb = named_prepare_buf(net, WITHDRAWAL, ITEM_SIZE, 0); >>> + if (!skb) { >>> pr_warn("Withdrawal distribution failure\n"); >>> return NULL; >>> } >>> - >>> - item = (struct distr_item *)msg_data(buf_msg(buf)); >>> + msg_set_named_seqno(buf_msg(skb), nt->snd_nxt++); >>> + msg_set_non_legacy(buf_msg(skb)); >>> + item = (struct distr_item *)msg_data(buf_msg(skb)); >>> publ_to_item(item, publ); >>> - return buf; >>> + return skb; >>> } >>> /** >>> @@ -141,7 +143,7 @@ struct sk_buff *tipc_named_withdraw(struct net >>> *net, struct publication *publ) >>> * @pls: linked list of publication items to be packed into buffer >>> chain >>> */ >>> static void named_distribute(struct net *net, struct sk_buff_head >>> *list, >>> - u32 dnode, struct list_head *pls) >>> + u32 dnode, struct list_head *pls, u16 seqno) >>> { >>> struct publication *publ; >>> struct sk_buff *skb = NULL; >>> @@ -149,6 +151,7 @@ static void named_distribute(struct net *net, >>> struct sk_buff_head *list, >>> u32 msg_dsz = ((tipc_node_get_mtu(net, dnode, 0, false) - >>> INT_H_SIZE) / >>> ITEM_SIZE) * ITEM_SIZE; >>> u32 msg_rem = msg_dsz; >>> + struct tipc_msg *hdr; >>> list_for_each_entry(publ, pls, binding_node) { >>> /* Prepare next buffer: */ >>> @@ -159,8 +162,11 @@ static void named_distribute(struct net *net, >>> struct sk_buff_head *list, >>> pr_warn("Bulk publication failure\n"); >>> return; >>> } >>> - msg_set_bc_ack_invalid(buf_msg(skb), true); >>> - item = (struct distr_item *)msg_data(buf_msg(skb)); >>> + hdr = buf_msg(skb); >>> + msg_set_bc_ack_invalid(hdr, true); >>> + msg_set_bulk(hdr); >>> + msg_set_non_legacy(hdr); >>> + item = (struct distr_item *)msg_data(hdr); >>> } >>> /* Pack publication into message: */ >>> @@ -176,24 +182,35 @@ static void named_distribute(struct net *net, >>> struct sk_buff_head *list, >>> } >>> } >>> if (skb) { >>> - msg_set_size(buf_msg(skb), INT_H_SIZE + (msg_dsz - msg_rem)); >>> + hdr = buf_msg(skb); >>> + msg_set_size(hdr, INT_H_SIZE + (msg_dsz - msg_rem)); >>> skb_trim(skb, INT_H_SIZE + (msg_dsz - msg_rem)); >>> __skb_queue_tail(list, skb); >>> } >>> + hdr = buf_msg(skb_peek_tail(list)); >>> + msg_set_last_bulk(hdr); >>> + msg_set_named_seqno(hdr, seqno); >>> } >>> /** >>> * tipc_named_node_up - tell specified node about all publications >>> by this node >>> */ >>> -void tipc_named_node_up(struct net *net, u32 dnode) >>> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities) >>> { >>> struct name_table *nt = tipc_name_table(net); >>> + struct tipc_net *tn = tipc_net(net); >>> struct sk_buff_head head; >>> + u16 seqno; >>> __skb_queue_head_init(&head); >>> + spin_lock_bh(&tn->nametbl_lock); >>> + if (!(capabilities & TIPC_NAMED_BCAST)) >>> + nt->rc_dests++; >>> + seqno = nt->snd_nxt; >>> + spin_unlock_bh(&tn->nametbl_lock); >>> read_lock_bh(&nt->cluster_scope_lock); >>> - named_distribute(net, &head, dnode, &nt->cluster_scope); >>> + named_distribute(net, &head, dnode, &nt->cluster_scope, seqno); >>> tipc_node_xmit(net, &head, dnode, 0); >>> read_unlock_bh(&nt->cluster_scope_lock); >>> } >>> @@ -245,13 +262,21 @@ static void tipc_dist_queue_purge(struct net >>> *net, u32 addr) >>> spin_unlock_bh(&tn->nametbl_lock); >>> } >>> -void tipc_publ_notify(struct net *net, struct list_head >>> *nsub_list, u32 addr) >>> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list, >>> + u32 addr, u16 capabilities) >>> { >>> + struct name_table *nt = tipc_name_table(net); >>> + struct tipc_net *tn = tipc_net(net); >>> + >>> struct publication *publ, *tmp; >>> list_for_each_entry_safe(publ, tmp, nsub_list, binding_node) >>> tipc_publ_purge(net, publ, addr); >>> tipc_dist_queue_purge(net, addr); >>> + spin_lock_bh(&tn->nametbl_lock); >>> + if (!(capabilities & TIPC_NAMED_BCAST)) >>> + nt->rc_dests--; >>> + spin_unlock_bh(&tn->nametbl_lock); >>> } >>> /** >>> @@ -295,29 +320,55 @@ static bool tipc_update_nametbl(struct net >>> *net, struct distr_item *i, >>> return false; >>> } >>> +struct sk_buff *tipc_named_dequeue(struct sk_buff_head *namedq, >>> + u16 *rcv_nxt, bool *open) >>> +{ >>> + struct sk_buff *skb, *tmp; >>> + struct tipc_msg *hdr; >>> + u16 seqno; >>> + >>> + skb_queue_walk_safe(namedq, skb, tmp) { >>> + skb_linearize(skb); >>> + hdr = buf_msg(skb); >>> + seqno = msg_named_seqno(hdr); >>> + if (msg_is_last_bulk(hdr)) { >>> + *rcv_nxt = seqno; >>> + *open = true; >>> + } >>> + if (msg_is_bulk(hdr) || msg_is_legacy(hdr)) { >>> + __skb_unlink(skb, namedq); >>> + return skb; >>> + } >>> + >>> + if (*open && (*rcv_nxt == seqno)) { >>> + (*rcv_nxt)++; >>> + __skb_unlink(skb, namedq); >>> + return skb; >>> + } >>> + } >>> + return NULL; >>> +} >>> + >>> /** >>> * tipc_named_rcv - process name table update messages sent by >>> another node >>> */ >>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq) >>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq, >>> + u16 *rcv_nxt, bool *open) >>> { >>> - struct tipc_net *tn = net_generic(net, tipc_net_id); >>> - struct tipc_msg *msg; >>> + struct tipc_net *tn = tipc_net(net); >>> struct distr_item *item; >>> - uint count; >>> - u32 node; >>> + struct tipc_msg *hdr; >>> struct sk_buff *skb; >>> - int mtype; >>> + u32 count, node = 0; >>> spin_lock_bh(&tn->nametbl_lock); >>> - for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) { >>> - skb_linearize(skb); >>> - msg = buf_msg(skb); >>> - mtype = msg_type(msg); >>> - item = (struct distr_item *)msg_data(msg); >>> - count = msg_data_sz(msg) / ITEM_SIZE; >>> - node = msg_orignode(msg); >>> + while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) { >>> + hdr = buf_msg(skb); >>> + node = msg_orignode(hdr); >>> + item = (struct distr_item *)msg_data(hdr); >>> + count = msg_data_sz(hdr) / ITEM_SIZE; >>> while (count--) { >>> - tipc_update_nametbl(net, item, node, mtype); >>> + tipc_update_nametbl(net, item, node, msg_type(hdr)); >>> item++; >>> } >>> kfree_skb(skb); >>> @@ -345,6 +396,6 @@ void tipc_named_reinit(struct net *net) >>> publ->node = self; >>> list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node) >>> publ->node = self; >>> - >>> + nt->rc_dests = 0; >>> spin_unlock_bh(&tn->nametbl_lock); >>> } >>> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h >>> index 63fc73e0fa6c..092323158f06 100644 >>> --- a/net/tipc/name_distr.h >>> +++ b/net/tipc/name_distr.h >>> @@ -67,11 +67,14 @@ struct distr_item { >>> __be32 key; >>> }; >>> +void tipc_named_bcast(struct net *net, struct sk_buff *skb); >>> struct sk_buff *tipc_named_publish(struct net *net, struct >>> publication *publ); >>> struct sk_buff *tipc_named_withdraw(struct net *net, struct >>> publication *publ); >>> -void tipc_named_node_up(struct net *net, u32 dnode); >>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue); >>> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities); >>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq, >>> + u16 *rcv_nxt, bool *open); >>> void tipc_named_reinit(struct net *net); >>> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, >>> u32 addr); >>> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list, >>> + u32 addr, u16 capabilities); >>> #endif >>> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c >>> index 359b2bc888cf..2ac33d32edc2 100644 >>> --- a/net/tipc/name_table.c >>> +++ b/net/tipc/name_table.c >>> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct >>> net *net, u32 type, u32 lower, >>> struct tipc_net *tn = tipc_net(net); >>> struct publication *p = NULL; >>> struct sk_buff *skb = NULL; >>> + u32 rc_dests; >>> spin_lock_bh(&tn->nametbl_lock); >>> @@ -743,12 +744,14 @@ struct publication >>> *tipc_nametbl_publish(struct net *net, u32 type, u32 lower, >>> nt->local_publ_count++; >>> skb = tipc_named_publish(net, p); >>> } >>> + rc_dests = nt->rc_dests; >>> exit: >>> spin_unlock_bh(&tn->nametbl_lock); >>> if (skb) >>> - tipc_node_broadcast(net, skb); >>> + tipc_node_broadcast(net, skb, rc_dests); >>> return p; >>> + >>> } >>> /** >>> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 >>> type, u32 lower, >>> u32 self = tipc_own_addr(net); >>> struct sk_buff *skb = NULL; >>> struct publication *p; >>> + u32 rc_dests; >>> spin_lock_bh(&tn->nametbl_lock); >>> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, >>> u32 type, u32 lower, >>> pr_err("Failed to remove local publication {%u,%u,%u}/%u\n", >>> type, lower, upper, key); >>> } >>> + rc_dests = nt->rc_dests; >>> spin_unlock_bh(&tn->nametbl_lock); >>> if (skb) { >>> - tipc_node_broadcast(net, skb); >>> + tipc_node_broadcast(net, skb, rc_dests); >>> return 1; >>> } >>> return 0; >>> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h >>> index 728bc7016c38..8064e1986e2c 100644 >>> --- a/net/tipc/name_table.h >>> +++ b/net/tipc/name_table.h >>> @@ -106,6 +106,8 @@ struct name_table { >>> struct list_head cluster_scope; >>> rwlock_t cluster_scope_lock; >>> u32 local_publ_count; >>> + u32 rc_dests; >>> + u32 snd_nxt; >>> }; >>> int tipc_nl_name_table_dump(struct sk_buff *skb, struct >>> netlink_callback *cb); >>> diff --git a/net/tipc/node.c b/net/tipc/node.c >>> index a4c2816c3746..030a51c4d1fa 100644 >>> --- a/net/tipc/node.c >>> +++ b/net/tipc/node.c >>> @@ -75,6 +75,8 @@ struct tipc_bclink_entry { >>> struct sk_buff_head arrvq; >>> struct sk_buff_head inputq2; >>> struct sk_buff_head namedq; >>> + u16 named_rcv_nxt; >>> + bool named_open; >>> }; >>> /** >>> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct >>> tipc_node *n) >>> write_unlock_bh(&n->lock); >>> if (flags & TIPC_NOTIFY_NODE_DOWN) >>> - tipc_publ_notify(net, publ_list, addr); >>> + tipc_publ_notify(net, publ_list, addr, n->capabilities); >>> if (flags & TIPC_NOTIFY_NODE_UP) >>> - tipc_named_node_up(net, addr); >>> + tipc_named_node_up(net, addr, n->capabilities); >>> if (flags & TIPC_NOTIFY_LINK_UP) { >>> tipc_mon_peer_up(net, addr, bearer_id); >>> @@ -1483,6 +1485,7 @@ static void node_lost_contact(struct tipc_node *n, >>> /* Clean up broadcast state */ >>> tipc_bcast_remove_peer(n->net, n->bc_entry.link); >>> + __skb_queue_purge(&n->bc_entry.namedq); >>> /* Abort any ongoing link failover */ >>> for (i = 0; i < MAX_BEARERS; i++) { >>> @@ -1729,12 +1732,23 @@ int tipc_node_distr_xmit(struct net *net, >>> struct sk_buff_head *xmitq) >>> return 0; >>> } >>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb) >>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int >>> rc_dests) >>> { >>> + struct sk_buff_head xmitq; >>> struct sk_buff *txskb; >>> struct tipc_node *n; >>> + u16 dummy; >>> u32 dst; >>> + /* Use broadcast if all nodes support it */ >>> + if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) { >>> + __skb_queue_head_init(&xmitq); >>> + __skb_queue_tail(&xmitq, skb); >>> + tipc_bcast_xmit(net, &xmitq, &dummy); >>> + return; >>> + } >>> + >>> + /* Otherwise use legacy replicast method */ >>> rcu_read_lock(); >>> list_for_each_entry_rcu(n, tipc_nodes(net), list) { >>> dst = n->addr; >>> @@ -1749,7 +1763,6 @@ void tipc_node_broadcast(struct net *net, >>> struct sk_buff *skb) >>> tipc_node_xmit_skb(net, txskb, dst, 0); >>> } >>> rcu_read_unlock(); >>> - >>> kfree_skb(skb); >>> } >>> @@ -1844,7 +1857,9 @@ static void tipc_node_bc_rcv(struct net *net, >>> struct sk_buff *skb, int bearer_id >>> /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */ >>> if (!skb_queue_empty(&n->bc_entry.namedq)) >>> - tipc_named_rcv(net, &n->bc_entry.namedq); >>> + tipc_named_rcv(net, &n->bc_entry.namedq, >>> + &n->bc_entry.named_rcv_nxt, >>> + &n->bc_entry.named_open); >>> /* If reassembly or retransmission failure => reset all links >>> to peer */ >>> if (rc & TIPC_LINK_DOWN_EVT) >>> @@ -2114,7 +2129,9 @@ void tipc_rcv(struct net *net, struct sk_buff >>> *skb, struct tipc_bearer *b) >>> tipc_node_link_down(n, bearer_id, false); >>> if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) >>> - tipc_named_rcv(net, &n->bc_entry.namedq); >>> + tipc_named_rcv(net, &n->bc_entry.namedq, >>> + &n->bc_entry.named_rcv_nxt, >>> + &n->bc_entry.named_open); >>> if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) >>> tipc_node_mcast_rcv(n); >>> diff --git a/net/tipc/node.h b/net/tipc/node.h >>> index a6803b449a2c..9f6f13f1604f 100644 >>> --- a/net/tipc/node.h >>> +++ b/net/tipc/node.h >>> @@ -55,7 +55,8 @@ enum { >>> TIPC_MCAST_RBCTL = (1 << 7), >>> TIPC_GAP_ACK_BLOCK = (1 << 8), >>> TIPC_TUNNEL_ENHANCED = (1 << 9), >>> - TIPC_NAGLE = (1 << 10) >>> + TIPC_NAGLE = (1 << 10), >>> + TIPC_NAMED_BCAST = (1 << 11) >>> }; >>> #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \ >>> @@ -68,7 +69,8 @@ enum { >>> TIPC_MCAST_RBCTL | \ >>> TIPC_GAP_ACK_BLOCK | \ >>> TIPC_TUNNEL_ENHANCED | \ >>> - TIPC_NAGLE) >>> + TIPC_NAGLE | \ >>> + TIPC_NAMED_BCAST) >>> #define INVALID_BEARER_ID -1 >>> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct >>> sk_buff *skb, u32 dest, >>> u32 selector); >>> void tipc_node_subscribe(struct net *net, struct list_head *subscr, >>> u32 addr); >>> void tipc_node_unsubscribe(struct net *net, struct list_head >>> *subscr, u32 addr); >>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb); >>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int >>> rc_dests); >>> int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 >>> peer_port); >>> void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); >>> int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool >>> connected); >> >> >> _______________________________________________ >> tipc-discussion mailing list >> tipc-discussion@lists.sourceforge.net >> https://lists.sourceforge.net/lists/listinfo/tipc-discussion >> > > > _______________________________________________ > tipc-discussion mailing list > tipc-discussion@lists.sourceforge.net > https://lists.sourceforge.net/lists/listinfo/tipc-discussion _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion