-----Original Message-----
From: Jon Maloy <jma...@redhat.com>
Sent: Monday, June 8, 2020 2:12 AM
To: Hoang Huu Le <hoang.h...@dektech.com.au>; ma...@donjonn.com;
ying....@windriver.com; tipc-discussion@lists.sourceforge.net
Subject: Re: [next-net v6] tipc: update a binding service via broadcast
On 6/6/20 11:10 PM, Hoang Huu Le wrote:
> -----Original Message-----
> From: Jon Maloy <jma...@redhat.com>
> Sent: Friday, June 5, 2020 8:03 PM
> To: Hoang Huu Le <hoang.h...@dektech.com.au>; ma...@donjonn.com;
> ying....@windriver.com; tipc-discussion@lists.sourceforge.net
> Subject: Re: [next-net v6] tipc: update a binding service via broadcast
>
>
>
> On 6/5/20 3:52 AM, Hoang Huu Le wrote:
>> Currently, updating binding table (add service binding to
>> name table/withdraw a service binding) is being sent over replicast.
>> However, if we are scaling up clusters to > 100 nodes/containers this
>> method is less affection because of looping through nodes in a cluster one
>> by one.
>>
[...]
> Still not needed. This queue should be flushed in
> tipc_node_lost_contact(), which I now see we don't do.
> [Hoang] Yes, that's right. I will verify and send it out.
Actually, this might explain the mysterious "Failed to withdraw"
printouts you observed during testing earlier.
Those withdraw items might be from a previous session just lingering in
the queue.
On the other hand, such a bug is so obvious and would have such grave
consequences (what if there are old 'publish' items in the queue?) that
I find it hard to believe that it can have remain unnoticed all this time.
Are you sure we are not cleaning up this queue somewhere else?
If it really is so we must also issue a correction patch to 'net' for
this issue.
[Hoang] Yes, I checked and already do the clean stuff at v2-3 in this feature
... So, we should apply this for 'net' too.
///jon
>
> This has to e fixed too.
> ///jon
>> + }
>> + }
>> + return NULL;
>> +}
>> +
>> /**
>> * tipc_named_rcv - process name table update messages sent by another
>> node
>> */
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> + u16 *rcv_nxt, bool *open)
>> {
>> - struct tipc_net *tn = net_generic(net, tipc_net_id);
>> - struct tipc_msg *msg;
>> + struct tipc_net *tn = tipc_net(net);
>> struct distr_item *item;
>> - uint count;
>> - u32 node;
>> + struct tipc_msg *hdr;
>> struct sk_buff *skb;
>> - int mtype;
>> + u32 count, node = 0;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>> - for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
>> - skb_linearize(skb);
>> - msg = buf_msg(skb);
>> - mtype = msg_type(msg);
>> - item = (struct distr_item *)msg_data(msg);
>> - count = msg_data_sz(msg) / ITEM_SIZE;
>> - node = msg_orignode(msg);
>> + while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
>> + hdr = buf_msg(skb);
>> + node = msg_orignode(hdr);
>> + item = (struct distr_item *)msg_data(hdr);
>> + count = msg_data_sz(hdr) / ITEM_SIZE;
>> while (count--) {
>> - tipc_update_nametbl(net, item, node, mtype);
>> + tipc_update_nametbl(net, item, node, msg_type(hdr));
>> item++;
>> }
>> kfree_skb(skb);
>> @@ -345,6 +402,6 @@ void tipc_named_reinit(struct net *net)
>> publ->node = self;
>> list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
>> publ->node = self;
>> -
>> + nt->rc_dests = 0;
>> spin_unlock_bh(&tn->nametbl_lock);
>> }
>> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
>> index 63fc73e0fa6c..092323158f06 100644
>> --- a/net/tipc/name_distr.h
>> +++ b/net/tipc/name_distr.h
>> @@ -67,11 +67,14 @@ struct distr_item {
>> __be32 key;
>> };
>>
>> +void tipc_named_bcast(struct net *net, struct sk_buff *skb);
>> struct sk_buff *tipc_named_publish(struct net *net, struct publication
>> *publ);
>> struct sk_buff *tipc_named_withdraw(struct net *net, struct publication
>> *publ);
>> -void tipc_named_node_up(struct net *net, u32 dnode);
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
>> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> + u16 *rcv_nxt, bool *open);
>> void tipc_named_reinit(struct net *net);
>> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32
>> addr);
>> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
>> + u32 addr, u16 capabilities);
>>
>> #endif
>> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
>> index 359b2bc888cf..2ac33d32edc2 100644
>> --- a/net/tipc/name_table.c
>> +++ b/net/tipc/name_table.c
>> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net
>> *net, u32 type, u32 lower,
>> struct tipc_net *tn = tipc_net(net);
>> struct publication *p = NULL;
>> struct sk_buff *skb = NULL;
>> + u32 rc_dests;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>>
>> @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net
>> *net, u32 type, u32 lower,
>> nt->local_publ_count++;
>> skb = tipc_named_publish(net, p);
>> }
>> + rc_dests = nt->rc_dests;
>> exit:
>> spin_unlock_bh(&tn->nametbl_lock);
>>
>> if (skb)
>> - tipc_node_broadcast(net, skb);
>> + tipc_node_broadcast(net, skb, rc_dests);
>> return p;
>> +
>> }
>>
>> /**
>> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32
>> lower,
>> u32 self = tipc_own_addr(net);
>> struct sk_buff *skb = NULL;
>> struct publication *p;
>> + u32 rc_dests;
>>
>> spin_lock_bh(&tn->nametbl_lock);
>>
>> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type,
>> u32 lower,
>> pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
>> type, lower, upper, key);
>> }
>> + rc_dests = nt->rc_dests;
>> spin_unlock_bh(&tn->nametbl_lock);
>>
>> if (skb) {
>> - tipc_node_broadcast(net, skb);
>> + tipc_node_broadcast(net, skb, rc_dests);
>> return 1;
>> }
>> return 0;
>> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
>> index 728bc7016c38..8064e1986e2c 100644
>> --- a/net/tipc/name_table.h
>> +++ b/net/tipc/name_table.h
>> @@ -106,6 +106,8 @@ struct name_table {
>> struct list_head cluster_scope;
>> rwlock_t cluster_scope_lock;
>> u32 local_publ_count;
>> + u32 rc_dests;
>> + u32 snd_nxt;
>> };
>>
>> int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback
>> *cb);
>> diff --git a/net/tipc/node.c b/net/tipc/node.c
>> index 803a3a6d0f50..ad8d7bce1f98 100644
>> --- a/net/tipc/node.c
>> +++ b/net/tipc/node.c
>> @@ -75,6 +75,8 @@ struct tipc_bclink_entry {
>> struct sk_buff_head arrvq;
>> struct sk_buff_head inputq2;
>> struct sk_buff_head namedq;
>> + u16 named_rcv_nxt;
>> + bool named_open;
>> };
>>
>> /**
>> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
>> write_unlock_bh(&n->lock);
>>
>> if (flags & TIPC_NOTIFY_NODE_DOWN)
>> - tipc_publ_notify(net, publ_list, addr);
>> + tipc_publ_notify(net, publ_list, addr, n->capabilities);
>>
>> if (flags & TIPC_NOTIFY_NODE_UP)
>> - tipc_named_node_up(net, addr);
>> + tipc_named_node_up(net, addr, n->capabilities);
>>
>> if (flags & TIPC_NOTIFY_LINK_UP) {
>> tipc_mon_peer_up(net, addr, bearer_id);
>> @@ -1729,12 +1731,23 @@ int tipc_node_distr_xmit(struct net *net, struct
>> sk_buff_head *xmitq)
>> return 0;
>> }
>>
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
>> {
>> + struct sk_buff_head xmitq;
>> struct sk_buff *txskb;
>> struct tipc_node *n;
>> + u16 dummy;
>> u32 dst;
>>
>> + /* Use broadcast if all nodes support it */
>> + if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
>> + __skb_queue_head_init(&xmitq);
>> + __skb_queue_tail(&xmitq, skb);
>> + tipc_bcast_xmit(net, &xmitq, &dummy);
>> + return;
>> + }
>> +
>> + /* Otherwise use legacy replicast method */
>> rcu_read_lock();
>> list_for_each_entry_rcu(n, tipc_nodes(net), list) {
>> dst = n->addr;
>> @@ -1749,7 +1762,6 @@ void tipc_node_broadcast(struct net *net, struct
>> sk_buff *skb)
>> tipc_node_xmit_skb(net, txskb, dst, 0);
>> }
>> rcu_read_unlock();
>> -
>> kfree_skb(skb);
>> }
>>
>> @@ -1844,7 +1856,9 @@ static void tipc_node_bc_rcv(struct net *net, struct
>> sk_buff *skb, int bearer_id
>>
>> /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
>> if (!skb_queue_empty(&n->bc_entry.namedq))
>> - tipc_named_rcv(net, &n->bc_entry.namedq);
>> + tipc_named_rcv(net, &n->bc_entry.namedq,
>> + &n->bc_entry.named_rcv_nxt,
>> + &n->bc_entry.named_open);
>>
>> /* If reassembly or retransmission failure => reset all links to peer */
>> if (rc & TIPC_LINK_DOWN_EVT)
>> @@ -2109,7 +2123,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb,
>> struct tipc_bearer *b)
>> tipc_node_link_down(n, bearer_id, false);
>>
>> if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
>> - tipc_named_rcv(net, &n->bc_entry.namedq);
>> + tipc_named_rcv(net, &n->bc_entry.namedq,
>> + &n->bc_entry.named_rcv_nxt,
>> + &n->bc_entry.named_open);
>>
>> if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
>> tipc_node_mcast_rcv(n);
>> diff --git a/net/tipc/node.h b/net/tipc/node.h
>> index a6803b449a2c..9f6f13f1604f 100644
>> --- a/net/tipc/node.h
>> +++ b/net/tipc/node.h
>> @@ -55,7 +55,8 @@ enum {
>> TIPC_MCAST_RBCTL = (1 << 7),
>> TIPC_GAP_ACK_BLOCK = (1 << 8),
>> TIPC_TUNNEL_ENHANCED = (1 << 9),
>> - TIPC_NAGLE = (1 << 10)
>> + TIPC_NAGLE = (1 << 10),
>> + TIPC_NAMED_BCAST = (1 << 11)
>> };
>>
>> #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
>> @@ -68,7 +69,8 @@ enum {
>> TIPC_MCAST_RBCTL | \
>> TIPC_GAP_ACK_BLOCK | \
>> TIPC_TUNNEL_ENHANCED | \
>> - TIPC_NAGLE)
>> + TIPC_NAGLE | \
>> + TIPC_NAMED_BCAST)
>>
>> #define INVALID_BEARER_ID -1
>>
>> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff
>> *skb, u32 dest,
>> u32 selector);
>> void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32
>> addr);
>> void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32
>> addr);
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int
>> rc_dests);
>> int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32
>> peer_port);
>> void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
>> int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion