Hi jon,

When I include this patch, ptts case 12 (multicast) fails when the 
client and server are running on the same node.

/Partha

On 12/22/2016 04:15 PM, Jon Maloy wrote:
> TIPC multicast messages are currently carried over a reliable
> 'broadcast link', making use of the underlying media's ability to
> transport packets as L2 broadcast or IP multicast to all nodes in
> the cluster.
>
> When the used bearer is lacking that ability, we can instead emulate
> the broadcast service by replicating and sending the packets over as
> many unicast links as needed to reach all identified destinations.
> We now introduce a new TIPC link-level 'replicast' service that does
> this.
>
> Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
> ---
>  net/tipc/bcast.c  | 105 
> ++++++++++++++++++++++++++++++++++++++++++------------
>  net/tipc/bcast.h  |   3 +-
>  net/tipc/link.c   |   8 ++++-
>  net/tipc/msg.c    |  17 +++++++++
>  net/tipc/msg.h    |   9 +++--
>  net/tipc/node.c   |  27 +++++++++-----
>  net/tipc/socket.c |  27 +++++++++-----
>  7 files changed, 149 insertions(+), 47 deletions(-)
>
> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
> index 412d335..672e6ef 100644
> --- a/net/tipc/bcast.c
> +++ b/net/tipc/bcast.c
> @@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
>
>  int tipc_bcast_get_mtu(struct net *net)
>  {
> -     return tipc_link_mtu(tipc_bc_sndlink(net));
> +     return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
>  }
>
>  /* tipc_bcbase_select_primary(): find a bearer with links to all 
> destinations,
> @@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct 
> sk_buff_head *xmitq)
>       __skb_queue_purge(&_xmitq);
>  }
>
> -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
> - *                    and to identified node local sockets
> +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
>   * @net: the applicable net namespace
> - * @list: chain of buffers containing message
> + * @pkts: chain of buffers containing message
> + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
>   * Consumes the buffer chain.
> - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
> + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
>   */
> -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
> +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +                        u16 *cong_link_cnt)
>  {
>       struct tipc_link *l = tipc_bc_sndlink(net);
> -     struct sk_buff_head xmitq, inputq, rcvq;
> +     struct sk_buff_head xmitq;
>       int rc = 0;
>
> -     __skb_queue_head_init(&rcvq);
>       __skb_queue_head_init(&xmitq);
> -     skb_queue_head_init(&inputq);
> -
> -     /* Prepare message clone for local node */
> -     if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
> -             return -EHOSTUNREACH;
> -
>       tipc_bcast_lock(net);
>       if (tipc_link_bc_peers(l))
> -             rc = tipc_link_xmit(l, list, &xmitq);
> +             rc = tipc_link_xmit(l, pkts, &xmitq);
>       tipc_bcast_unlock(net);
> +     tipc_bcbase_xmit(net, &xmitq);
> +     __skb_queue_purge(pkts);
> +     if (rc == -ELINKCONG) {
> +             *cong_link_cnt = 1;
> +             rc = 0;
> +     }
> +     return rc;
> +}
>
> -     /* Don't send to local node if adding to link failed */
> -     if (unlikely(rc && (rc != -ELINKCONG))) {
> -             __skb_queue_purge(&rcvq);
> -             return rc;
> +/* tipc_rcast_xmit - replicate and send a message to given destination nodes
> + * @net: the applicable net namespace
> + * @pkts: chain of buffers containing message
> + * @dests: list of destination nodes
> + * @cong_link_cnt: returns number of congested links
> + * @cong_links: returns identities of congested links
> + * Returns 0 if success, otherwise errno
> + */
> +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +                        struct tipc_nlist *dests, u16 *cong_link_cnt)
> +{
> +     struct sk_buff_head _pkts;
> +     struct u32_item *n, *tmp;
> +     u32 dst, selector;
> +
> +     selector = msg_link_selector(buf_msg(skb_peek(pkts)));
> +     __skb_queue_head_init(&_pkts);
> +
> +     list_for_each_entry_safe(n, tmp, &dests->list, list) {
> +             dst = n->value;
> +             if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
> +                     return -ENOMEM;
> +
> +             /* Any other return value than -ELINKCONG is ignored */
> +             if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
> +                     (*cong_link_cnt)++;
>       }
> +     return 0;
> +}
>
> -     /* Broadcast to all nodes, inluding local node */
> -     tipc_bcbase_xmit(net, &xmitq);
> -     tipc_sk_mcast_rcv(net, &rcvq, &inputq);
> -     __skb_queue_purge(list);
> +/* tipc_mcast_xmit - deliver message to indicated destination nodes
> + *                   and to identified node local sockets
> + * @net: the applicable net namespace
> + * @pkts: chain of buffers containing message
> + * @dests: destination nodes for message. Not consumed.
> + * @cong_link_cnt: returns number of encountered congested destination links
> + * @cong_links: returns identities of congested links
> + * Consumes buffer chain.
> + * Returns 0 if success, otherwise errno
> + */
> +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +                 struct tipc_nlist *dests, u16 *cong_link_cnt)
> +{
> +     struct tipc_bc_base *bb = tipc_bc_base(net);
> +     struct sk_buff_head inputq, localq;
> +     int rc = 0;
> +
> +     skb_queue_head_init(&inputq);
> +     skb_queue_head_init(&localq);
> +
> +     /* Clone packets before they are consumed by next call */
> +     if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
> +             rc = -ENOMEM;
> +             goto exit;
> +     }
> +
> +     if (dests->remote) {
> +             if (!bb->bcast_support)
> +                     rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
> +             else
> +                     rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
> +     }
> +
> +     if (dests->local)
> +             tipc_sk_mcast_rcv(net, &localq, &inputq);
> +exit:
> +     __skb_queue_purge(pkts);
>       return rc;
>  }
>
> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
> index 18f3791..dd772e6 100644
> --- a/net/tipc/bcast.h
> +++ b/net/tipc/bcast.h
> @@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct 
> tipc_link *rcv_bcl);
>  void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
>  void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
>  int  tipc_bcast_get_mtu(struct net *net);
> -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
> +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
> +                 struct tipc_nlist *dests, u16 *cong_link_cnt);
>  int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff 
> *skb);
>  void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
>                       struct tipc_msg *hdr);
> diff --git a/net/tipc/link.c b/net/tipc/link.c
> index b758ca8..d1766ce 100644
> --- a/net/tipc/link.c
> +++ b/net/tipc/link.c
> @@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, 
> u16 to,
>  static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
>                           struct sk_buff_head *inputq)
>  {
> -     switch (msg_user(buf_msg(skb))) {
> +     struct tipc_msg *hdr = buf_msg(skb);
> +
> +     switch (msg_user(hdr)) {
>       case TIPC_LOW_IMPORTANCE:
>       case TIPC_MEDIUM_IMPORTANCE:
>       case TIPC_HIGH_IMPORTANCE:
>       case TIPC_CRITICAL_IMPORTANCE:
> +             if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
> +                     skb_queue_tail(l->bc_rcvlink->inputq, skb);
> +                     return true;
> +             }
>       case CONN_MANAGER:
>               skb_queue_tail(inputq, skb);
>               return true;
> diff --git a/net/tipc/msg.c b/net/tipc/msg.c
> index 17201aa..f4e6197 100644
> --- a/net/tipc/msg.c
> +++ b/net/tipc/msg.c
> @@ -607,6 +607,23 @@ bool tipc_msg_reassemble(struct sk_buff_head *list, 
> struct sk_buff_head *rcvq)
>       return false;
>  }
>
> +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
> +                     struct sk_buff_head *cpy)
> +{
> +     struct sk_buff *skb, *_skb;
> +
> +     skb_queue_walk(msg, skb) {
> +             _skb = pskb_copy(skb, GFP_ATOMIC);
> +             if (!_skb) {
> +                     __skb_queue_purge(cpy);
> +                     return false;
> +             }
> +             msg_set_destnode(buf_msg(_skb), dst);
> +             __skb_queue_tail(cpy, _skb);
> +     }
> +     return true;
> +}
> +
>  /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
>   * @list: list to be appended to
>   * @seqno: sequence number of buffer to add
> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
> index 850ae0e..295e477 100644
> --- a/net/tipc/msg.h
> +++ b/net/tipc/msg.h
> @@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, 
> u32 id)
>
>  static inline u32 msg_link_selector(struct tipc_msg *m)
>  {
> +     if (msg_user(m) == MSG_FRAGMENTER)
> +             m = (void *)msg_data(m);
>       return msg_bits(m, 4, 0, 1);
>  }
>
> -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
> -{
> -     msg_set_bits(m, 4, 0, 1, n);
> -}
> -
>  /*
>   * Word 5
>   */
> @@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr 
> *m,
>                  int offset, int dsz, int mtu, struct sk_buff_head *list);
>  bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
>  bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head 
> *rcvq);
> +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
> +                     struct sk_buff_head *cpy);
>  void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
>                            struct sk_buff *skb);
>
> diff --git a/net/tipc/node.c b/net/tipc/node.c
> index 2883f6a..f96dacf 100644
> --- a/net/tipc/node.c
> +++ b/net/tipc/node.c
> @@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct 
> sk_buff *skb)
>       kfree_skb(skb);
>  }
>
> +static void tipc_node_mcast_rcv(struct tipc_node *n)
> +{
> +     struct tipc_bclink_entry *be = &n->bc_entry;
> +
> +     /* 'arrvq' is under inputq2's lock protection */
> +     spin_lock_bh(&be->inputq2.lock);
> +     spin_lock_bh(&be->inputq1.lock);
> +     skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
> +     spin_unlock_bh(&be->inputq1.lock);
> +     spin_unlock_bh(&be->inputq2.lock);
> +     tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
> +}
> +
>  static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
>                                 int bearer_id, struct sk_buff_head *xmitq)
>  {
> @@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net, struct 
> sk_buff *skb, int bearer_id
>       if (!skb_queue_empty(&xmitq))
>               tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
>
> -     /* Deliver. 'arrvq' is under inputq2's lock protection */
> -     if (!skb_queue_empty(&be->inputq1)) {
> -             spin_lock_bh(&be->inputq2.lock);
> -             spin_lock_bh(&be->inputq1.lock);
> -             skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
> -             spin_unlock_bh(&be->inputq1.lock);
> -             spin_unlock_bh(&be->inputq2.lock);
> -             tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
> -     }
> +     if (!skb_queue_empty(&be->inputq1))
> +             tipc_node_mcast_rcv(n);
>
>       if (rc & TIPC_LINK_DOWN_EVT) {
>               /* Reception reassembly failure => reset all links to peer */
> @@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, 
> struct tipc_bearer *b)
>       if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
>               tipc_named_rcv(net, &n->bc_entry.namedq);
>
> +     if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
> +             tipc_node_mcast_rcv(n);
> +
>       if (!skb_queue_empty(&le->inputq))
>               tipc_sk_rcv(net, &le->inputq);
>
> diff --git a/net/tipc/socket.c b/net/tipc/socket.c
> index b456265..1f948e6 100644
> --- a/net/tipc/socket.c
> +++ b/net/tipc/socket.c
> @@ -740,32 +740,43 @@ static int tipc_sendmcast(struct  socket *sock, struct 
> tipc_name_seq *seq,
>       struct tipc_msg *hdr = &tsk->phdr;
>       struct net *net = sock_net(sk);
>       int mtu = tipc_bcast_get_mtu(net);
> +     u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
>       struct sk_buff_head pkts;
> +     struct tipc_nlist dsts;
>       int rc;
>
> +     /* Block or return if any destination link is congested */
>       rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
>       if (unlikely(rc))
>               return rc;
>
> +     /* Lookup destination nodes */
> +     tipc_nlist_init(&dsts, tipc_own_addr(net));
> +     tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
> +                                   seq->upper, domain, &dsts);
> +     if (!dsts.local && !dsts.remote)
> +             return -EHOSTUNREACH;
> +
> +     /* Build message header */
>       msg_set_type(hdr, TIPC_MCAST_MSG);
> +     msg_set_hdr_sz(hdr, MCAST_H_SIZE);
>       msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
>       msg_set_destport(hdr, 0);
>       msg_set_destnode(hdr, 0);
>       msg_set_nametype(hdr, seq->type);
>       msg_set_namelower(hdr, seq->lower);
>       msg_set_nameupper(hdr, seq->upper);
> -     msg_set_hdr_sz(hdr, MCAST_H_SIZE);
>
> +     /* Build message as chain of buffers */
>       skb_queue_head_init(&pkts);
>       rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
> -     if (unlikely(rc != dlen))
> -             return rc;
>
> -     rc = tipc_bcast_xmit(net, &pkts);
> -     if (unlikely(rc == -ELINKCONG)) {
> -             tsk->cong_link_cnt = 1;
> -             rc = 0;
> -     }
> +     /* Send message if build was successful */
> +     if (unlikely(rc == dlen))
> +             rc = tipc_mcast_xmit(net, &pkts, &dsts,
> +                                  &tsk->cong_link_cnt);
> +
> +     tipc_nlist_purge(&dsts);
>
>       return rc ? rc : dlen;
>  }
>

------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most 
engaging tech sites, SlashDot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to