Hi jon, When I include this patch, ptts case 12 (multicast) fails when the client and server are running on the same node.
/Partha On 12/22/2016 04:15 PM, Jon Maloy wrote: > TIPC multicast messages are currently carried over a reliable > 'broadcast link', making use of the underlying media's ability to > transport packets as L2 broadcast or IP multicast to all nodes in > the cluster. > > When the used bearer is lacking that ability, we can instead emulate > the broadcast service by replicating and sending the packets over as > many unicast links as needed to reach all identified destinations. > We now introduce a new TIPC link-level 'replicast' service that does > this. > > Signed-off-by: Jon Maloy <jon.ma...@ericsson.com> > --- > net/tipc/bcast.c | 105 > ++++++++++++++++++++++++++++++++++++++++++------------ > net/tipc/bcast.h | 3 +- > net/tipc/link.c | 8 ++++- > net/tipc/msg.c | 17 +++++++++ > net/tipc/msg.h | 9 +++-- > net/tipc/node.c | 27 +++++++++----- > net/tipc/socket.c | 27 +++++++++----- > 7 files changed, 149 insertions(+), 47 deletions(-) > > diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c > index 412d335..672e6ef 100644 > --- a/net/tipc/bcast.c > +++ b/net/tipc/bcast.c > @@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net) > > int tipc_bcast_get_mtu(struct net *net) > { > - return tipc_link_mtu(tipc_bc_sndlink(net)); > + return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; > } > > /* tipc_bcbase_select_primary(): find a bearer with links to all > destinations, > @@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct > sk_buff_head *xmitq) > __skb_queue_purge(&_xmitq); > } > > -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster > - * and to identified node local sockets > +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes > * @net: the applicable net namespace > - * @list: chain of buffers containing message > + * @pkts: chain of buffers containing message > + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0 > * Consumes the buffer chain. > - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE > + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE > */ > -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list) > +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts, > + u16 *cong_link_cnt) > { > struct tipc_link *l = tipc_bc_sndlink(net); > - struct sk_buff_head xmitq, inputq, rcvq; > + struct sk_buff_head xmitq; > int rc = 0; > > - __skb_queue_head_init(&rcvq); > __skb_queue_head_init(&xmitq); > - skb_queue_head_init(&inputq); > - > - /* Prepare message clone for local node */ > - if (unlikely(!tipc_msg_reassemble(list, &rcvq))) > - return -EHOSTUNREACH; > - > tipc_bcast_lock(net); > if (tipc_link_bc_peers(l)) > - rc = tipc_link_xmit(l, list, &xmitq); > + rc = tipc_link_xmit(l, pkts, &xmitq); > tipc_bcast_unlock(net); > + tipc_bcbase_xmit(net, &xmitq); > + __skb_queue_purge(pkts); > + if (rc == -ELINKCONG) { > + *cong_link_cnt = 1; > + rc = 0; > + } > + return rc; > +} > > - /* Don't send to local node if adding to link failed */ > - if (unlikely(rc && (rc != -ELINKCONG))) { > - __skb_queue_purge(&rcvq); > - return rc; > +/* tipc_rcast_xmit - replicate and send a message to given destination nodes > + * @net: the applicable net namespace > + * @pkts: chain of buffers containing message > + * @dests: list of destination nodes > + * @cong_link_cnt: returns number of congested links > + * @cong_links: returns identities of congested links > + * Returns 0 if success, otherwise errno > + */ > +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts, > + struct tipc_nlist *dests, u16 *cong_link_cnt) > +{ > + struct sk_buff_head _pkts; > + struct u32_item *n, *tmp; > + u32 dst, selector; > + > + selector = msg_link_selector(buf_msg(skb_peek(pkts))); > + __skb_queue_head_init(&_pkts); > + > + list_for_each_entry_safe(n, tmp, &dests->list, list) { > + dst = n->value; > + if (!tipc_msg_pskb_copy(dst, pkts, &_pkts)) > + return -ENOMEM; > + > + /* Any other return value than -ELINKCONG is ignored */ > + if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG) > + (*cong_link_cnt)++; > } > + return 0; > +} > > - /* Broadcast to all nodes, inluding local node */ > - tipc_bcbase_xmit(net, &xmitq); > - tipc_sk_mcast_rcv(net, &rcvq, &inputq); > - __skb_queue_purge(list); > +/* tipc_mcast_xmit - deliver message to indicated destination nodes > + * and to identified node local sockets > + * @net: the applicable net namespace > + * @pkts: chain of buffers containing message > + * @dests: destination nodes for message. Not consumed. > + * @cong_link_cnt: returns number of encountered congested destination links > + * @cong_links: returns identities of congested links > + * Consumes buffer chain. > + * Returns 0 if success, otherwise errno > + */ > +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, > + struct tipc_nlist *dests, u16 *cong_link_cnt) > +{ > + struct tipc_bc_base *bb = tipc_bc_base(net); > + struct sk_buff_head inputq, localq; > + int rc = 0; > + > + skb_queue_head_init(&inputq); > + skb_queue_head_init(&localq); > + > + /* Clone packets before they are consumed by next call */ > + if (dests->local && !tipc_msg_reassemble(pkts, &localq)) { > + rc = -ENOMEM; > + goto exit; > + } > + > + if (dests->remote) { > + if (!bb->bcast_support) > + rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt); > + else > + rc = tipc_bcast_xmit(net, pkts, cong_link_cnt); > + } > + > + if (dests->local) > + tipc_sk_mcast_rcv(net, &localq, &inputq); > +exit: > + __skb_queue_purge(pkts); > return rc; > } > > diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h > index 18f3791..dd772e6 100644 > --- a/net/tipc/bcast.h > +++ b/net/tipc/bcast.h > @@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct > tipc_link *rcv_bcl); > void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); > void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); > int tipc_bcast_get_mtu(struct net *net); > -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); > +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts, > + struct tipc_nlist *dests, u16 *cong_link_cnt); > int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff > *skb); > void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, > struct tipc_msg *hdr); > diff --git a/net/tipc/link.c b/net/tipc/link.c > index b758ca8..d1766ce 100644 > --- a/net/tipc/link.c > +++ b/net/tipc/link.c > @@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, > u16 to, > static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb, > struct sk_buff_head *inputq) > { > - switch (msg_user(buf_msg(skb))) { > + struct tipc_msg *hdr = buf_msg(skb); > + > + switch (msg_user(hdr)) { > case TIPC_LOW_IMPORTANCE: > case TIPC_MEDIUM_IMPORTANCE: > case TIPC_HIGH_IMPORTANCE: > case TIPC_CRITICAL_IMPORTANCE: > + if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) { > + skb_queue_tail(l->bc_rcvlink->inputq, skb); > + return true; > + } > case CONN_MANAGER: > skb_queue_tail(inputq, skb); > return true; > diff --git a/net/tipc/msg.c b/net/tipc/msg.c > index 17201aa..f4e6197 100644 > --- a/net/tipc/msg.c > +++ b/net/tipc/msg.c > @@ -607,6 +607,23 @@ bool tipc_msg_reassemble(struct sk_buff_head *list, > struct sk_buff_head *rcvq) > return false; > } > > +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, > + struct sk_buff_head *cpy) > +{ > + struct sk_buff *skb, *_skb; > + > + skb_queue_walk(msg, skb) { > + _skb = pskb_copy(skb, GFP_ATOMIC); > + if (!_skb) { > + __skb_queue_purge(cpy); > + return false; > + } > + msg_set_destnode(buf_msg(_skb), dst); > + __skb_queue_tail(cpy, _skb); > + } > + return true; > +} > + > /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number > * @list: list to be appended to > * @seqno: sequence number of buffer to add > diff --git a/net/tipc/msg.h b/net/tipc/msg.h > index 850ae0e..295e477 100644 > --- a/net/tipc/msg.h > +++ b/net/tipc/msg.h > @@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, > u32 id) > > static inline u32 msg_link_selector(struct tipc_msg *m) > { > + if (msg_user(m) == MSG_FRAGMENTER) > + m = (void *)msg_data(m); > return msg_bits(m, 4, 0, 1); > } > > -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n) > -{ > - msg_set_bits(m, 4, 0, 1, n); > -} > - > /* > * Word 5 > */ > @@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr > *m, > int offset, int dsz, int mtu, struct sk_buff_head *list); > bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); > bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head > *rcvq); > +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, > + struct sk_buff_head *cpy); > void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, > struct sk_buff *skb); > > diff --git a/net/tipc/node.c b/net/tipc/node.c > index 2883f6a..f96dacf 100644 > --- a/net/tipc/node.c > +++ b/net/tipc/node.c > @@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct > sk_buff *skb) > kfree_skb(skb); > } > > +static void tipc_node_mcast_rcv(struct tipc_node *n) > +{ > + struct tipc_bclink_entry *be = &n->bc_entry; > + > + /* 'arrvq' is under inputq2's lock protection */ > + spin_lock_bh(&be->inputq2.lock); > + spin_lock_bh(&be->inputq1.lock); > + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); > + spin_unlock_bh(&be->inputq1.lock); > + spin_unlock_bh(&be->inputq2.lock); > + tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2); > +} > + > static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, > int bearer_id, struct sk_buff_head *xmitq) > { > @@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net, struct > sk_buff *skb, int bearer_id > if (!skb_queue_empty(&xmitq)) > tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); > > - /* Deliver. 'arrvq' is under inputq2's lock protection */ > - if (!skb_queue_empty(&be->inputq1)) { > - spin_lock_bh(&be->inputq2.lock); > - spin_lock_bh(&be->inputq1.lock); > - skb_queue_splice_tail_init(&be->inputq1, &be->arrvq); > - spin_unlock_bh(&be->inputq1.lock); > - spin_unlock_bh(&be->inputq2.lock); > - tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2); > - } > + if (!skb_queue_empty(&be->inputq1)) > + tipc_node_mcast_rcv(n); > > if (rc & TIPC_LINK_DOWN_EVT) { > /* Reception reassembly failure => reset all links to peer */ > @@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, > struct tipc_bearer *b) > if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) > tipc_named_rcv(net, &n->bc_entry.namedq); > > + if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1))) > + tipc_node_mcast_rcv(n); > + > if (!skb_queue_empty(&le->inputq)) > tipc_sk_rcv(net, &le->inputq); > > diff --git a/net/tipc/socket.c b/net/tipc/socket.c > index b456265..1f948e6 100644 > --- a/net/tipc/socket.c > +++ b/net/tipc/socket.c > @@ -740,32 +740,43 @@ static int tipc_sendmcast(struct socket *sock, struct > tipc_name_seq *seq, > struct tipc_msg *hdr = &tsk->phdr; > struct net *net = sock_net(sk); > int mtu = tipc_bcast_get_mtu(net); > + u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE); > struct sk_buff_head pkts; > + struct tipc_nlist dsts; > int rc; > > + /* Block or return if any destination link is congested */ > rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); > if (unlikely(rc)) > return rc; > > + /* Lookup destination nodes */ > + tipc_nlist_init(&dsts, tipc_own_addr(net)); > + tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, > + seq->upper, domain, &dsts); > + if (!dsts.local && !dsts.remote) > + return -EHOSTUNREACH; > + > + /* Build message header */ > msg_set_type(hdr, TIPC_MCAST_MSG); > + msg_set_hdr_sz(hdr, MCAST_H_SIZE); > msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE); > msg_set_destport(hdr, 0); > msg_set_destnode(hdr, 0); > msg_set_nametype(hdr, seq->type); > msg_set_namelower(hdr, seq->lower); > msg_set_nameupper(hdr, seq->upper); > - msg_set_hdr_sz(hdr, MCAST_H_SIZE); > > + /* Build message as chain of buffers */ > skb_queue_head_init(&pkts); > rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts); > - if (unlikely(rc != dlen)) > - return rc; > > - rc = tipc_bcast_xmit(net, &pkts); > - if (unlikely(rc == -ELINKCONG)) { > - tsk->cong_link_cnt = 1; > - rc = 0; > - } > + /* Send message if build was successful */ > + if (unlikely(rc == dlen)) > + rc = tipc_mcast_xmit(net, &pkts, &dsts, > + &tsk->cong_link_cnt); > + > + tipc_nlist_purge(&dsts); > > return rc ? rc : dlen; > } > ------------------------------------------------------------------------------ Check out the vibrant tech community on one of the world's most engaging tech sites, SlashDot.org! http://sdm.link/slashdot _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion