-----Original Message-----
From: Jon Maloy <jma...@redhat.com> 
Sent: Monday, June 8, 2020 8:33 PM
To: Hoang Huu Le <hoang.h...@dektech.com.au>; ma...@donjonn.com; 
ying....@windriver.com; tipc-discussion@lists.sourceforge.net
Subject: Re: [next-net v6] tipc: update a binding service via broadcast



On 6/6/20 11:10 PM, Hoang Huu Le wrote:
> -----Original Message-----
> From: Jon Maloy <jma...@redhat.com>
> Sent: Friday, June 5, 2020 8:03 PM
> To: Hoang Huu Le <hoang.h...@dektech.com.au>; ma...@donjonn.com; 
> ying....@windriver.com; tipc-discussion@lists.sourceforge.net
> Subject: Re: [next-net v6] tipc: update a binding service via broadcast
>
>
>
> On 6/5/20 3:52 AM, Hoang Huu Le wrote:
>> Currently, updating binding table (add service binding to
>> name table/withdraw a service binding) is being sent over replicast.
>> However, if we are scaling up clusters to > 100 nodes/containers this
>> method is less affection because of looping through nodes in a cluster one
>> by one.
[...]
>> +            if (*open && (*rcv_nxt == seqno)) {
>> +                    (*rcv_nxt)++;
>> +                    __skb_unlink(skb, namedq);
>> +                    return skb;
>> +            }
>> +
>> +            if (less(seqno, *rcv_nxt)) {
>> +                    __skb_unlink(skb, namedq);
>> +                    kfree_skb(skb);
>> +                    continue;
> Still not needed. This queue should be flushed in
> tipc_node_lost_contact(), which I now see we don't do.
> [Hoang] Yes, that's right. I will verify and send it out.
>
> This has to e fixed too.
> ///jon
I hate to admit it, but we might actually need this test after all. 
Imagine that somebody has done 'publish' just after the broadcast link 
came up (in tipc_bcast_add_peer()) , but before tipc_named_node_up() is 
called. The context of those two calls is not atomic, so I think it is 
possible that this publication might end up both in the bcast_link 
backlog queue and in the bulk distribution.
This publication message will have a named_seqno that is lower than the 
agreed synch point, and should be dropped at reception.

Given the crucial role of the binding table for the overall TIPC 
functionality I think it is better be safe than sorry here, and keep 
this test.
[Hoang] Finally, I'm able to reproduce the problem as same as above scene:
<code>
357                 if (less(seqno, *rcv_nxt)) {
358                         pr_info("DROP[%x->%x]: %s blk %d lblk %d nxt %d 
legacy %d seqno %u bc %u hsz %u dsz %u qlen %u\n",
359                                 msg_orignode(hdr), tipc_own_addr(net),
360                                 msg_type(hdr) == PUBLICATION ? 
"PUBL":"DRAW",
361                                 msg_is_bulk(hdr), msg_is_last_bulk(hdr),
362                                 *rcv_nxt, msg_is_legacy(hdr),
363                                 msg_named_seqno(hdr), msg_non_seq(hdr),
364                                 msg_hdr_sz(hdr), msg_data_sz(hdr),
365                                 skb_queue_len(namedq));
366
367                         __skb_unlink(skb, namedq);
368                         kfree_skb(skb);
369                         continue;
370                 }
</code>
-----------
[12528.036895] tipc: Established link <1001024:eth0-1001001:brtipc> on network 
plane A
[12528.043857] tipc: Established link <1001002:brtipc-1001001:brtipc> on 
network plane A
[12528.136462] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3878 bc 0 hsz 40 dsz 20 qlen 23
[12528.140864] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3879 bc 0 hsz 40 dsz 20 qlen 22
[...]
[12528.210959] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3893 bc 0 hsz 40 dsz 20 qlen 8
[12528.218903] tipc: DROP[1001001->1001002]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3894 bc 0 hsz 40 dsz 20 qlen 7
[12528.227214] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3878 bc 0 hsz 40 dsz 20 qlen 23
[12528.231285] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3879 bc 0 hsz 40 dsz 20 qlen 22
[...]
[12528.277445] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3893 bc 0 hsz 40 dsz 20 qlen 8
[12528.280847] tipc: DROP[1001001->1001024]: DRAW blk 0 lblk 0 nxt 3895 legacy 
0 seqno 3894 bc 0 hsz 40 dsz 20 qlen 7 
---  
I will re-post the patch including the test as well.

///jon

>> +            }
>> +    }
>> +    return NULL;
>> +}
>> +
>>    /**
>>     * tipc_named_rcv - process name table update messages sent by another 
>> node
>>     */
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> +                u16 *rcv_nxt, bool *open)
>>    {
>> -    struct tipc_net *tn = net_generic(net, tipc_net_id);
>> -    struct tipc_msg *msg;
>> +    struct tipc_net *tn = tipc_net(net);
>>      struct distr_item *item;
>> -    uint count;
>> -    u32 node;
>> +    struct tipc_msg *hdr;
>>      struct sk_buff *skb;
>> -    int mtype;
>> +    u32 count, node = 0;
>>    
>>      spin_lock_bh(&tn->nametbl_lock);
>> -    for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
>> -            skb_linearize(skb);
>> -            msg = buf_msg(skb);
>> -            mtype = msg_type(msg);
>> -            item = (struct distr_item *)msg_data(msg);
>> -            count = msg_data_sz(msg) / ITEM_SIZE;
>> -            node = msg_orignode(msg);
>> +    while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
>> +            hdr = buf_msg(skb);
>> +            node = msg_orignode(hdr);
>> +            item = (struct distr_item *)msg_data(hdr);
>> +            count = msg_data_sz(hdr) / ITEM_SIZE;
>>              while (count--) {
>> -                    tipc_update_nametbl(net, item, node, mtype);
>> +                    tipc_update_nametbl(net, item, node, msg_type(hdr));
>>                      item++;
>>              }
>>              kfree_skb(skb);
>> @@ -345,6 +402,6 @@ void tipc_named_reinit(struct net *net)
>>              publ->node = self;
>>      list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
>>              publ->node = self;
>> -
>> +    nt->rc_dests = 0;
>>      spin_unlock_bh(&tn->nametbl_lock);
>>    }
>> diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
>> index 63fc73e0fa6c..092323158f06 100644
>> --- a/net/tipc/name_distr.h
>> +++ b/net/tipc/name_distr.h
>> @@ -67,11 +67,14 @@ struct distr_item {
>>      __be32 key;
>>    };
>>    
>> +void tipc_named_bcast(struct net *net, struct sk_buff *skb);
>>    struct sk_buff *tipc_named_publish(struct net *net, struct publication 
>> *publ);
>>    struct sk_buff *tipc_named_withdraw(struct net *net, struct publication 
>> *publ);
>> -void tipc_named_node_up(struct net *net, u32 dnode);
>> -void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
>> +void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
>> +void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
>> +                u16 *rcv_nxt, bool *open);
>>    void tipc_named_reinit(struct net *net);
>> -void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 
>> addr);
>> +void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
>> +                  u32 addr, u16 capabilities);
>>    
>>    #endif
>> diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
>> index 359b2bc888cf..2ac33d32edc2 100644
>> --- a/net/tipc/name_table.c
>> +++ b/net/tipc/name_table.c
>> @@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net 
>> *net, u32 type, u32 lower,
>>      struct tipc_net *tn = tipc_net(net);
>>      struct publication *p = NULL;
>>      struct sk_buff *skb = NULL;
>> +    u32 rc_dests;
>>    
>>      spin_lock_bh(&tn->nametbl_lock);
>>    
>> @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net 
>> *net, u32 type, u32 lower,
>>              nt->local_publ_count++;
>>              skb = tipc_named_publish(net, p);
>>      }
>> +    rc_dests = nt->rc_dests;
>>    exit:
>>      spin_unlock_bh(&tn->nametbl_lock);
>>    
>>      if (skb)
>> -            tipc_node_broadcast(net, skb);
>> +            tipc_node_broadcast(net, skb, rc_dests);
>>      return p;
>> +
>>    }
>>    
>>    /**
>> @@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 
>> lower,
>>      u32 self = tipc_own_addr(net);
>>      struct sk_buff *skb = NULL;
>>      struct publication *p;
>> +    u32 rc_dests;
>>    
>>      spin_lock_bh(&tn->nametbl_lock);
>>    
>> @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, 
>> u32 lower,
>>              pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
>>                     type, lower, upper, key);
>>      }
>> +    rc_dests = nt->rc_dests;
>>      spin_unlock_bh(&tn->nametbl_lock);
>>    
>>      if (skb) {
>> -            tipc_node_broadcast(net, skb);
>> +            tipc_node_broadcast(net, skb, rc_dests);
>>              return 1;
>>      }
>>      return 0;
>> diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
>> index 728bc7016c38..8064e1986e2c 100644
>> --- a/net/tipc/name_table.h
>> +++ b/net/tipc/name_table.h
>> @@ -106,6 +106,8 @@ struct name_table {
>>      struct list_head cluster_scope;
>>      rwlock_t cluster_scope_lock;
>>      u32 local_publ_count;
>> +    u32 rc_dests;
>> +    u32 snd_nxt;
>>    };
>>    
>>    int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback 
>> *cb);
>> diff --git a/net/tipc/node.c b/net/tipc/node.c
>> index 803a3a6d0f50..ad8d7bce1f98 100644
>> --- a/net/tipc/node.c
>> +++ b/net/tipc/node.c
>> @@ -75,6 +75,8 @@ struct tipc_bclink_entry {
>>      struct sk_buff_head arrvq;
>>      struct sk_buff_head inputq2;
>>      struct sk_buff_head namedq;
>> +    u16 named_rcv_nxt;
>> +    bool named_open;
>>    };
>>    
>>    /**
>> @@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
>>      write_unlock_bh(&n->lock);
>>    
>>      if (flags & TIPC_NOTIFY_NODE_DOWN)
>> -            tipc_publ_notify(net, publ_list, addr);
>> +            tipc_publ_notify(net, publ_list, addr, n->capabilities);
>>    
>>      if (flags & TIPC_NOTIFY_NODE_UP)
>> -            tipc_named_node_up(net, addr);
>> +            tipc_named_node_up(net, addr, n->capabilities);
>>    
>>      if (flags & TIPC_NOTIFY_LINK_UP) {
>>              tipc_mon_peer_up(net, addr, bearer_id);
>> @@ -1729,12 +1731,23 @@ int tipc_node_distr_xmit(struct net *net, struct 
>> sk_buff_head *xmitq)
>>      return 0;
>>    }
>>    
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
>>    {
>> +    struct sk_buff_head xmitq;
>>      struct sk_buff *txskb;
>>      struct tipc_node *n;
>> +    u16 dummy;
>>      u32 dst;
>>    
>> +    /* Use broadcast if all nodes support it */
>> +    if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
>> +            __skb_queue_head_init(&xmitq);
>> +            __skb_queue_tail(&xmitq, skb);
>> +            tipc_bcast_xmit(net, &xmitq, &dummy);
>> +            return;
>> +    }
>> +
>> +    /* Otherwise use legacy replicast method */
>>      rcu_read_lock();
>>      list_for_each_entry_rcu(n, tipc_nodes(net), list) {
>>              dst = n->addr;
>> @@ -1749,7 +1762,6 @@ void tipc_node_broadcast(struct net *net, struct 
>> sk_buff *skb)
>>              tipc_node_xmit_skb(net, txskb, dst, 0);
>>      }
>>      rcu_read_unlock();
>> -
>>      kfree_skb(skb);
>>    }
>>    
>> @@ -1844,7 +1856,9 @@ static void tipc_node_bc_rcv(struct net *net, struct 
>> sk_buff *skb, int bearer_id
>>    
>>      /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
>>      if (!skb_queue_empty(&n->bc_entry.namedq))
>> -            tipc_named_rcv(net, &n->bc_entry.namedq);
>> +            tipc_named_rcv(net, &n->bc_entry.namedq,
>> +                           &n->bc_entry.named_rcv_nxt,
>> +                           &n->bc_entry.named_open);
>>    
>>      /* If reassembly or retransmission failure => reset all links to peer */
>>      if (rc & TIPC_LINK_DOWN_EVT)
>> @@ -2109,7 +2123,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, 
>> struct tipc_bearer *b)
>>              tipc_node_link_down(n, bearer_id, false);
>>    
>>      if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
>> -            tipc_named_rcv(net, &n->bc_entry.namedq);
>> +            tipc_named_rcv(net, &n->bc_entry.namedq,
>> +                           &n->bc_entry.named_rcv_nxt,
>> +                           &n->bc_entry.named_open);
>>    
>>      if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
>>              tipc_node_mcast_rcv(n);
>> diff --git a/net/tipc/node.h b/net/tipc/node.h
>> index a6803b449a2c..9f6f13f1604f 100644
>> --- a/net/tipc/node.h
>> +++ b/net/tipc/node.h
>> @@ -55,7 +55,8 @@ enum {
>>      TIPC_MCAST_RBCTL      = (1 << 7),
>>      TIPC_GAP_ACK_BLOCK    = (1 << 8),
>>      TIPC_TUNNEL_ENHANCED  = (1 << 9),
>> -    TIPC_NAGLE            = (1 << 10)
>> +    TIPC_NAGLE            = (1 << 10),
>> +    TIPC_NAMED_BCAST      = (1 << 11)
>>    };
>>    
>>    #define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT           |  \
>> @@ -68,7 +69,8 @@ enum {
>>                              TIPC_MCAST_RBCTL       |   \
>>                              TIPC_GAP_ACK_BLOCK     |   \
>>                              TIPC_TUNNEL_ENHANCED   |   \
>> -                            TIPC_NAGLE)
>> +                            TIPC_NAGLE             |   \
>> +                            TIPC_NAMED_BCAST)
>>    
>>    #define INVALID_BEARER_ID -1
>>    
>> @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff 
>> *skb, u32 dest,
>>                     u32 selector);
>>    void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 
>> addr);
>>    void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 
>> addr);
>> -void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
>> +void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int 
>> rc_dests);
>>    int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 
>> peer_port);
>>    void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
>>    int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);


_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to