Hi Jon,

The change also makes us come back to the original state as there was a flag
member in bearer struct which is used to indicate whether bearer is up or down.

Actually we ever had lots of discussions about it, and ultimately we removed it
from bearer struct.

Now if we re-add it into bearer struct, we still face the same risk as before,
that is, how can we ensure the "up" flag is safely accessed in different
contexts simultaneously?

Under the current locking policy of protecting tn->bearer_list[], it's really
unsafe for us to access the flag under different contexts at the same time. This
is because bearer_list[] is provided by RCU. For an object protected by RCU, we
cannot directly update a member of the object protected by RCU, instead we must
follow the Read-Copy-Update rule of RCU lock. For instance, for a bearer object,
we should first allocate a new bearer object, and update its member to the new
value, and replace the pointer of bearer_list[] pointed to the old bearer object
with the new bearer object. Finally we release the old bearer at a proper time.

In addition, there is an alternative solution. When interface is down, we don't
reset bearer with tipc_reset_bearer(), instead we can disable the bearer
attached to the interface with bearer_disable(). As the event of what interface
is down rarely happens, this approach should be feasible in practice.

Regards,
Ying

On 03/27/2016 04:05 AM, Jon Maloy wrote:
> Resetting a bearer/interface, with the consequence of resetting all its
> pertaining links, is not an atomic action. This becomes particularly
> evident in very large clusters, where a lot of traffic may happen on the
> remaining links while we are busy shutting them down. In extreme cases,
> we may even see links being re-created and re-established before we are
> finished with the job.
> 
> Since we don't want to detach the bearer from the interface if the
> latter is only being cycled, we now introduce a boolean field in
> struct tipc_bearer which serves as an indicator of whether packets
> can be sent or received at the moment. The rule is simple; if the flag
> is set to true, all packets are let through, if it is false, all packets
> are dropped, with the exception of outgoing RESET messages. The latter
> will speed up the neighbors' detection of the node event, and saves them
> unnecessary probing.
> 
> Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
> ---
>  net/tipc/bearer.c | 22 +++++++++++++++-------
>  net/tipc/bearer.h |  2 ++
>  net/tipc/msg.h    |  5 +++++
>  3 files changed, 22 insertions(+), 7 deletions(-)
> 
> diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
> index 2c66013..50a4bd30 100644
> --- a/net/tipc/bearer.c
> +++ b/net/tipc/bearer.c
> @@ -310,7 +310,7 @@ restart:
>       }
>  
>       rcu_assign_pointer(tn->bearer_list[bearer_id], b);
> -
> +     b->up = true;
>       pr_info("Enabled bearer <%s>, discovery domain %s, priority %u\n",
>               name,
>               tipc_addr_string_fill(addr_string, disc_domain), priority);
> @@ -336,12 +336,13 @@ static int tipc_reset_bearer(struct net *net, struct 
> tipc_bearer *b)
>  static void bearer_disable(struct net *net, struct tipc_bearer *b)
>  {
>       struct tipc_net *tn = net_generic(net, tipc_net_id);
> +     int bearer_id = b->identity;
>       u32 i;
>  
>       pr_info("Disabling bearer <%s>\n", b->name);
>       b->media->disable_media(b);
> -
> -     tipc_node_delete_links(net, b->identity);
> +     b->up = false;
> +     tipc_node_delete_links(net, bearer_id);
>       RCU_INIT_POINTER(b->media_ptr, NULL);
>       if (b->link_req)
>               tipc_disc_delete(b->link_req);
> @@ -448,7 +449,7 @@ void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
>  
>       rcu_read_lock();
>       b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
> -     if (likely(b))
> +     if (likely(b && b->up))
>               b->media->send_msg(net, skb, b, dest);
>       else
>               kfree_skb(skb);
> @@ -474,7 +475,10 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
>               __skb_queue_purge(xmitq);
>       skb_queue_walk_safe(xmitq, skb, tmp) {
>               __skb_dequeue(xmitq);
> -             b->media->send_msg(net, skb, b, dst);
> +             if (likely(b->up || msg_is_reset(buf_msg(skb))))
> +                     b->media->send_msg(net, skb, b, dst);
> +             else
> +                     kfree_skb(skb);
>       }
>       rcu_read_unlock();
>  }
> @@ -492,7 +496,7 @@ void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
>  
>       rcu_read_lock();
>       b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
> -     if (unlikely(!b))
> +     if (unlikely(!b || !b->up))
>               __skb_queue_purge(xmitq);
>       skb_queue_walk_safe(xmitq, skb, tmp) {
>               hdr = buf_msg(skb);
> @@ -522,7 +526,7 @@ static int tipc_l2_rcv_msg(struct sk_buff *skb, struct 
> net_device *dev,
>  
>       rcu_read_lock();
>       b = rcu_dereference_rtnl(dev->tipc_ptr);
> -     if (likely(b && (skb->pkt_type <= PACKET_BROADCAST))) {
> +     if (likely(b && b->up && (skb->pkt_type <= PACKET_BROADCAST))) {
>               skb->next = NULL;
>               tipc_rcv(dev_net(dev), skb, b);
>               rcu_read_unlock();
> @@ -559,7 +563,11 @@ static int tipc_l2_device_event(struct notifier_block 
> *nb, unsigned long evt,
>       case NETDEV_CHANGE:
>               if (netif_carrier_ok(dev))
>                       break;
> +     case NETDEV_UP:
> +             b->up = true;
> +             break;
>       case NETDEV_GOING_DOWN:
> +             b->up = false;
>       case NETDEV_CHANGEMTU:
>               tipc_reset_bearer(net, b);
>               break;
> diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
> index 8c52a34..f0aa0d7 100644
> --- a/net/tipc/bearer.h
> +++ b/net/tipc/bearer.h
> @@ -142,6 +142,7 @@ struct tipc_media {
>   * @identity: array index of this bearer within TIPC bearer array
>   * @link_req: ptr to (optional) structure making periodic link setup requests
>   * @net_plane: network plane ('A' through 'H') currently associated with 
> bearer
> + * @up: indicates if bearer is open for reception/transmission
>   *
>   * Note: media-specific code is responsible for initialization of the fields
>   * indicated below when a bearer is enabled; TIPC's generic bearer code takes
> @@ -162,6 +163,7 @@ struct tipc_bearer {
>       u32 identity;
>       struct tipc_link_req *link_req;
>       char net_plane;
> +     bool up;
>  };
>  
>  struct tipc_bearer_names {
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
> index 55778a0..f34f639 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -779,6 +779,11 @@ static inline bool msg_peer_node_is_up(struct tipc_msg 
> *m)
>       return msg_redundant_link(m);
>  }
>  
> +static inline bool msg_is_reset(struct tipc_msg *hdr)
> +{
> +     return (msg_user(hdr) == LINK_PROTOCOL) && (msg_type(hdr) == RESET_MSG);
> +}
> +
>  struct sk_buff *tipc_buf_acquire(u32 size);
>  bool tipc_msg_validate(struct sk_buff *skb);
>  bool tipc_msg_reverse(u32 own_addr, struct sk_buff **skb, int err);
> 


------------------------------------------------------------------------------
Transform Data into Opportunity.
Accelerate data analysis in your applications with
Intel Data Analytics Acceleration Library.
Click to learn more.
http://pubads.g.doubleclick.net/gampad/clk?id=278785471&iu=/4140
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to