On 6/6/20 11:10 PM, Hoang Huu Le wrote:
-----Original Message-----
From: Jon Maloy <jma...@redhat.com>
Sent: Friday, June 5, 2020 8:03 PM
To: Hoang Huu Le <hoang.h...@dektech.com.au>; ma...@donjonn.com; 
ying....@windriver.com; tipc-discussion@lists.sourceforge.net
Subject: Re: [next-net v6] tipc: update a binding service via broadcast



On 6/5/20 3:52 AM, Hoang Huu Le wrote:
Currently, updating binding table (add service binding to
name table/withdraw a service binding) is being sent over replicast.
However, if we are scaling up clusters to > 100 nodes/containers this
method is less affection because of looping through nodes in a cluster one
by one.
[...]
+               if (*open && (*rcv_nxt == seqno)) {
+                       (*rcv_nxt)++;
+                       __skb_unlink(skb, namedq);
+                       return skb;
+               }
+
+               if (less(seqno, *rcv_nxt)) {
+                       __skb_unlink(skb, namedq);
+                       kfree_skb(skb);
+                       continue;
Still not needed. This queue should be flushed in
tipc_node_lost_contact(), which I now see we don't do.
[Hoang] Yes, that's right. I will verify and send it out.

This has to e fixed too.
///jon
I hate to admit it, but we might actually need this test after all. Imagine that somebody has done 'publish' just after the broadcast link came up (in tipc_bcast_add_peer()) , but before tipc_named_node_up() is called. The context of those two calls is not atomic, so I think it is possible that this publication might end up both in the bcast_link backlog queue and in the bulk distribution. This publication message will have a named_seqno that is lower than the agreed synch point, and should be dropped at reception.

Given the crucial role of the binding table for the overall TIPC functionality I think it is better be safe than sorry here, and keep this test.

///jon

+               }
+       }
+       return NULL;
+}
+
   /**
    * tipc_named_rcv - process name table update messages sent by another node
    */
-void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+                   u16 *rcv_nxt, bool *open)
   {
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_msg *msg;
+       struct tipc_net *tn = tipc_net(net);
        struct distr_item *item;
-       uint count;
-       u32 node;
+       struct tipc_msg *hdr;
        struct sk_buff *skb;
-       int mtype;
+       u32 count, node = 0;
spin_lock_bh(&tn->nametbl_lock);
-       for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
-               skb_linearize(skb);
-               msg = buf_msg(skb);
-               mtype = msg_type(msg);
-               item = (struct distr_item *)msg_data(msg);
-               count = msg_data_sz(msg) / ITEM_SIZE;
-               node = msg_orignode(msg);
+       while ((skb = tipc_named_dequeue(namedq, rcv_nxt, open))) {
+               hdr = buf_msg(skb);
+               node = msg_orignode(hdr);
+               item = (struct distr_item *)msg_data(hdr);
+               count = msg_data_sz(hdr) / ITEM_SIZE;
                while (count--) {
-                       tipc_update_nametbl(net, item, node, mtype);
+                       tipc_update_nametbl(net, item, node, msg_type(hdr));
                        item++;
                }
                kfree_skb(skb);
@@ -345,6 +402,6 @@ void tipc_named_reinit(struct net *net)
                publ->node = self;
        list_for_each_entry_rcu(publ, &nt->cluster_scope, binding_node)
                publ->node = self;
-
+       nt->rc_dests = 0;
        spin_unlock_bh(&tn->nametbl_lock);
   }
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 63fc73e0fa6c..092323158f06 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -67,11 +67,14 @@ struct distr_item {
        __be32 key;
   };
+void tipc_named_bcast(struct net *net, struct sk_buff *skb);
   struct sk_buff *tipc_named_publish(struct net *net, struct publication 
*publ);
   struct sk_buff *tipc_named_withdraw(struct net *net, struct publication 
*publ);
-void tipc_named_node_up(struct net *net, u32 dnode);
-void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
+void tipc_named_node_up(struct net *net, u32 dnode, u16 capabilities);
+void tipc_named_rcv(struct net *net, struct sk_buff_head *namedq,
+                   u16 *rcv_nxt, bool *open);
   void tipc_named_reinit(struct net *net);
-void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
+void tipc_publ_notify(struct net *net, struct list_head *nsub_list,
+                     u32 addr, u16 capabilities);
#endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 359b2bc888cf..2ac33d32edc2 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -729,6 +729,7 @@ struct publication *tipc_nametbl_publish(struct net *net, 
u32 type, u32 lower,
        struct tipc_net *tn = tipc_net(net);
        struct publication *p = NULL;
        struct sk_buff *skb = NULL;
+       u32 rc_dests;
spin_lock_bh(&tn->nametbl_lock); @@ -743,12 +744,14 @@ struct publication *tipc_nametbl_publish(struct net *net, u32 type, u32 lower,
                nt->local_publ_count++;
                skb = tipc_named_publish(net, p);
        }
+       rc_dests = nt->rc_dests;
   exit:
        spin_unlock_bh(&tn->nametbl_lock);
if (skb)
-               tipc_node_broadcast(net, skb);
+               tipc_node_broadcast(net, skb, rc_dests);
        return p;
+
   }
/**
@@ -762,6 +765,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 
lower,
        u32 self = tipc_own_addr(net);
        struct sk_buff *skb = NULL;
        struct publication *p;
+       u32 rc_dests;
spin_lock_bh(&tn->nametbl_lock); @@ -775,10 +779,11 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower,
                pr_err("Failed to remove local publication {%u,%u,%u}/%u\n",
                       type, lower, upper, key);
        }
+       rc_dests = nt->rc_dests;
        spin_unlock_bh(&tn->nametbl_lock);
if (skb) {
-               tipc_node_broadcast(net, skb);
+               tipc_node_broadcast(net, skb, rc_dests);
                return 1;
        }
        return 0;
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 728bc7016c38..8064e1986e2c 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -106,6 +106,8 @@ struct name_table {
        struct list_head cluster_scope;
        rwlock_t cluster_scope_lock;
        u32 local_publ_count;
+       u32 rc_dests;
+       u32 snd_nxt;
   };
int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 803a3a6d0f50..ad8d7bce1f98 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -75,6 +75,8 @@ struct tipc_bclink_entry {
        struct sk_buff_head arrvq;
        struct sk_buff_head inputq2;
        struct sk_buff_head namedq;
+       u16 named_rcv_nxt;
+       bool named_open;
   };
/**
@@ -396,10 +398,10 @@ static void tipc_node_write_unlock(struct tipc_node *n)
        write_unlock_bh(&n->lock);
if (flags & TIPC_NOTIFY_NODE_DOWN)
-               tipc_publ_notify(net, publ_list, addr);
+               tipc_publ_notify(net, publ_list, addr, n->capabilities);
if (flags & TIPC_NOTIFY_NODE_UP)
-               tipc_named_node_up(net, addr);
+               tipc_named_node_up(net, addr, n->capabilities);
if (flags & TIPC_NOTIFY_LINK_UP) {
                tipc_mon_peer_up(net, addr, bearer_id);
@@ -1729,12 +1731,23 @@ int tipc_node_distr_xmit(struct net *net, struct 
sk_buff_head *xmitq)
        return 0;
   }
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests)
   {
+       struct sk_buff_head xmitq;
        struct sk_buff *txskb;
        struct tipc_node *n;
+       u16 dummy;
        u32 dst;
+ /* Use broadcast if all nodes support it */
+       if (!rc_dests && tipc_bcast_get_mode(net) != BCLINK_MODE_RCAST) {
+               __skb_queue_head_init(&xmitq);
+               __skb_queue_tail(&xmitq, skb);
+               tipc_bcast_xmit(net, &xmitq, &dummy);
+               return;
+       }
+
+       /* Otherwise use legacy replicast method */
        rcu_read_lock();
        list_for_each_entry_rcu(n, tipc_nodes(net), list) {
                dst = n->addr;
@@ -1749,7 +1762,6 @@ void tipc_node_broadcast(struct net *net, struct sk_buff 
*skb)
                tipc_node_xmit_skb(net, txskb, dst, 0);
        }
        rcu_read_unlock();
-
        kfree_skb(skb);
   }
@@ -1844,7 +1856,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id /* Handle NAME_DISTRIBUTOR messages sent from 1.7 nodes */
        if (!skb_queue_empty(&n->bc_entry.namedq))
-               tipc_named_rcv(net, &n->bc_entry.namedq);
+               tipc_named_rcv(net, &n->bc_entry.namedq,
+                              &n->bc_entry.named_rcv_nxt,
+                              &n->bc_entry.named_open);
/* If reassembly or retransmission failure => reset all links to peer */
        if (rc & TIPC_LINK_DOWN_EVT)
@@ -2109,7 +2123,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, 
struct tipc_bearer *b)
                tipc_node_link_down(n, bearer_id, false);
if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
-               tipc_named_rcv(net, &n->bc_entry.namedq);
+               tipc_named_rcv(net, &n->bc_entry.namedq,
+                              &n->bc_entry.named_rcv_nxt,
+                              &n->bc_entry.named_open);
if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
                tipc_node_mcast_rcv(n);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index a6803b449a2c..9f6f13f1604f 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,7 +55,8 @@ enum {
        TIPC_MCAST_RBCTL      = (1 << 7),
        TIPC_GAP_ACK_BLOCK    = (1 << 8),
        TIPC_TUNNEL_ENHANCED  = (1 << 9),
-       TIPC_NAGLE            = (1 << 10)
+       TIPC_NAGLE            = (1 << 10),
+       TIPC_NAMED_BCAST      = (1 << 11)
   };
#define TIPC_NODE_CAPABILITIES (TIPC_SYN_BIT | \
@@ -68,7 +69,8 @@ enum {
                                TIPC_MCAST_RBCTL       |   \
                                TIPC_GAP_ACK_BLOCK     |   \
                                TIPC_TUNNEL_ENHANCED   |   \
-                               TIPC_NAGLE)
+                               TIPC_NAGLE             |   \
+                               TIPC_NAMED_BCAST)
#define INVALID_BEARER_ID -1 @@ -101,7 +103,7 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
                       u32 selector);
   void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 
addr);
   void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 
addr);
-void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
+void tipc_node_broadcast(struct net *net, struct sk_buff *skb, int rc_dests);
   int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
   void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
   int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel, bool connected);



_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to