> -----Original Message-----
> From: Parthasarathy Bhuvaragan
> Sent: Friday, 13 January, 2017 04:24
> To: Jon Maloy <jon.ma...@ericsson.com>; tipc-discussion@lists.sourceforge.net;
> Ying Xue <ying....@windriver.com>
> Subject: Re: [tipc-discussion] [net-next v3 3/4] tipc: introduce replicast as
> transport option for multicast
> 
> On 01/04/2017 06:05 PM, Parthasarathy Bhuvaragan wrote:
> > Hi Jon,
> >
> > Added some minor comments inline in this patch, apart from that the
> > major concern is the following:
> >
> > All my tests which passed before this patch, fails while sending
> > multicast to a receiver on own node.
> >
> > With this patch, we increase the likelyhood of receive buffer overflow
> > if the sender & receivers are running on the same host as we bypass the
> > link layer completely. I confirmed this with some traces in filter_rcv().
> >
> > If I add another multicast listener running on another node, this
> > pacifies the sender (put the sender to sleep at link congestion) and
> > relatively slow link layer reduces the buffer overflow.
> >
> > We need to find a way reduce the aggressiveness of the sender.
> > We want users to be transparent about the location of the services, so
> > we should to provide similar charecteristics regardless of the service
> > location.
> >
> Jon, running ptts sever and client on a standalone node without your
> updates failed. So in that aspect, iam ok with this patch.
> 
> If the ethernet bearer lacks broadcast ability, then neighbor discovery
> will not work. So do we intend to introduce support to add ethernet
> peers manually as we do for udp bearers? otherwise we can never use
> replicast for non udp bearers.

I believe all Ethernet implementations, even overlay networks, provide some 
form of broadcast, or in lack thereof, an emulated broadcast.
So, discovery should work, but it will be very inefficient when we do link 
broadcast, because tipc will think that genuine Ethernet broadcast is supported.
We actually need some way to find out what kind of "Ethernet" we are attached 
to, e.g. VXLAN, so that the "bcast supported" flag  can be set correctly.
I wonder if that if possible, or if it has to be configured.

///jon

> 
> /Partha
> 
> > /Partha
> >
> > On 01/02/2017 03:34 PM, Parthasarathy Bhuvaragan wrote:
> >> Hi jon,
> >>
> >> When I include this patch, ptts case 12 (multicast) fails when the
> >> client and server are running on the same node.
> >>
> >> /Partha
> >>
> >> On 12/22/2016 04:15 PM, Jon Maloy wrote:
> >>> TIPC multicast messages are currently carried over a reliable
> >>> 'broadcast link', making use of the underlying media's ability to
> >>> transport packets as L2 broadcast or IP multicast to all nodes in
> >>> the cluster.
> >>>
> >>> When the used bearer is lacking that ability, we can instead emulate
> >>> the broadcast service by replicating and sending the packets over as
> >>> many unicast links as needed to reach all identified destinations.
> >>> We now introduce a new TIPC link-level 'replicast' service that does
> >>> this.
> >>>
> >>> Signed-off-by: Jon Maloy <jon.ma...@ericsson.com>
> >>> ---
> >>>  net/tipc/bcast.c  | 105
> ++++++++++++++++++++++++++++++++++++++++++------------
> >>>  net/tipc/bcast.h  |   3 +-
> >>>  net/tipc/link.c   |   8 ++++-
> >>>  net/tipc/msg.c    |  17 +++++++++
> >>>  net/tipc/msg.h    |   9 +++--
> >>>  net/tipc/node.c   |  27 +++++++++-----
> >>>  net/tipc/socket.c |  27 +++++++++-----
> >>>  7 files changed, 149 insertions(+), 47 deletions(-)
> >>>
> >>> diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
> >>> index 412d335..672e6ef 100644
> >>> --- a/net/tipc/bcast.c
> >>> +++ b/net/tipc/bcast.c
> >>> @@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net
> *net)
> >>>
> >>>  int tipc_bcast_get_mtu(struct net *net)
> >>>  {
> >>> - return tipc_link_mtu(tipc_bc_sndlink(net));
> >>> + return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
> >>>  }
> >>>
> >>>  /* tipc_bcbase_select_primary(): find a bearer with links to all 
> >>> destinations,
> >>> @@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net,
> struct sk_buff_head *xmitq)
> >>>   __skb_queue_purge(&_xmitq);
> >>>  }
> >>>
> >>> -/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
> >>> - *                    and to identified node local sockets
> >>> +/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
> >>>   * @net: the applicable net namespace
> >>> - * @list: chain of buffers containing message
> >>> + * @pkts: chain of buffers containing message
> >>> + * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
> >>>   * Consumes the buffer chain.
> >>> - * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-
> EMSGSIZE
> >>> + * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
> >>>   */
> >>> -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
> >>> +static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
> >>> +                    u16 *cong_link_cnt)
> >>>  {
> >>>   struct tipc_link *l = tipc_bc_sndlink(net);
> >>> - struct sk_buff_head xmitq, inputq, rcvq;
> >>> + struct sk_buff_head xmitq;
> >>>   int rc = 0;
> >>>
> >>> - __skb_queue_head_init(&rcvq);
> >>>   __skb_queue_head_init(&xmitq);
> >>> - skb_queue_head_init(&inputq);
> >>> -
> >>> - /* Prepare message clone for local node */
> >>> - if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
> >>> -         return -EHOSTUNREACH;
> >>> -
> >>>   tipc_bcast_lock(net);
> >>>   if (tipc_link_bc_peers(l))
> >>> -         rc = tipc_link_xmit(l, list, &xmitq);
> >>> +         rc = tipc_link_xmit(l, pkts, &xmitq);
> >>>   tipc_bcast_unlock(net);
> >>> + tipc_bcbase_xmit(net, &xmitq);
> >>> + __skb_queue_purge(pkts);
> >>> + if (rc == -ELINKCONG) {
> >>> +         *cong_link_cnt = 1;
> >>> +         rc = 0;
> >>> + }
> >>> + return rc;
> >>> +}
> >>>
> >>> - /* Don't send to local node if adding to link failed */
> >>> - if (unlikely(rc && (rc != -ELINKCONG))) {
> >>> -         __skb_queue_purge(&rcvq);
> >>> -         return rc;
> >>> +/* tipc_rcast_xmit - replicate and send a message to given destination
> nodes
> >>> + * @net: the applicable net namespace
> >>> + * @pkts: chain of buffers containing message
> >>> + * @dests: list of destination nodes
> >>> + * @cong_link_cnt: returns number of congested links
> >>> + * @cong_links: returns identities of congested links
> > remove @cong_links
> >>> + * Returns 0 if success, otherwise errno
> >>> + */
> >>> +static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
> >>> +                    struct tipc_nlist *dests, u16 *cong_link_cnt)
> >>> +{
> >>> + struct sk_buff_head _pkts;
> >>> + struct u32_item *n, *tmp;
> >>> + u32 dst, selector;
> >>> +
> >>> + selector = msg_link_selector(buf_msg(skb_peek(pkts)));
> >>> + __skb_queue_head_init(&_pkts);
> >>> +
> >>> + list_for_each_entry_safe(n, tmp, &dests->list, list) {
> >>> +         dst = n->value;
> >>> +         if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
> >>> +                 return -ENOMEM;
> >>> +
> >>> +         /* Any other return value than -ELINKCONG is ignored */
> >>> +         if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
> >>> +                 (*cong_link_cnt)++;
> >>>   }
> >>> + return 0;
> >>> +}
> >>>
> >>> - /* Broadcast to all nodes, inluding local node */
> >>> - tipc_bcbase_xmit(net, &xmitq);
> >>> - tipc_sk_mcast_rcv(net, &rcvq, &inputq);
> >>> - __skb_queue_purge(list);
> >>> +/* tipc_mcast_xmit - deliver message to indicated destination nodes
> >>> + *                   and to identified node local sockets
> >>> + * @net: the applicable net namespace
> >>> + * @pkts: chain of buffers containing message
> >>> + * @dests: destination nodes for message. Not consumed.
> >>> + * @cong_link_cnt: returns number of encountered congested destination
> links
> >>> + * @cong_links: returns identities of congested links
> > remove @cong_links
> >>> + * Consumes buffer chain.
> >>> + * Returns 0 if success, otherwise errno
> >>> + */
> >>> +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
> >>> +             struct tipc_nlist *dests, u16 *cong_link_cnt)
> >>> +{
> >>> + struct tipc_bc_base *bb = tipc_bc_base(net);
> >>> + struct sk_buff_head inputq, localq;
> >>> + int rc = 0;
> >>> +
> >>> + skb_queue_head_init(&inputq);
> >>> + skb_queue_head_init(&localq);
> >>> +
> >>> + /* Clone packets before they are consumed by next call */
> >>> + if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
> >>> +         rc = -ENOMEM;
> >>> +         goto exit;
> >>> + }
> >>> +
> >>> + if (dests->remote) {
> >>> +         if (!bb->bcast_support)
> >>> +                 rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
> >>> +         else
> >>> +                 rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
> >>> + }
> >>> +
> >>> + if (dests->local)
> >>> +         tipc_sk_mcast_rcv(net, &localq, &inputq);
> >>> +exit:
> >>> + __skb_queue_purge(pkts);
> >>>   return rc;
> >>>  }
> >>>
> >>> diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
> >>> index 18f3791..dd772e6 100644
> >>> --- a/net/tipc/bcast.h
> >>> +++ b/net/tipc/bcast.h
> >>> @@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct
> tipc_link *rcv_bcl);
> >>>  void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
> >>>  void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
> >>>  int  tipc_bcast_get_mtu(struct net *net);
> >>> -int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
> >>> +int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
> >>> +             struct tipc_nlist *dests, u16 *cong_link_cnt);
> >>>  int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff 
> >>> *skb);
> >>>  void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
> >>>                   struct tipc_msg *hdr);
> >>> diff --git a/net/tipc/link.c b/net/tipc/link.c
> >>> index b758ca8..d1766ce 100644
> >>> --- a/net/tipc/link.c
> >>> +++ b/net/tipc/link.c
> >>> @@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 
> >>> from,
> u16 to,
> >>>  static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
> >>>                       struct sk_buff_head *inputq)
> >>>  {
> >>> - switch (msg_user(buf_msg(skb))) {
> >>> + struct tipc_msg *hdr = buf_msg(skb);
> >>> +
> >>> + switch (msg_user(hdr)) {
> >>>   case TIPC_LOW_IMPORTANCE:
> >>>   case TIPC_MEDIUM_IMPORTANCE:
> >>>   case TIPC_HIGH_IMPORTANCE:
> >>>   case TIPC_CRITICAL_IMPORTANCE:
> >>> +         if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
> >>> +                 skb_queue_tail(l->bc_rcvlink->inputq, skb);
> >>> +                 return true;
> >>> +         }
> >>>   case CONN_MANAGER:
> >>>           skb_queue_tail(inputq, skb);
> >>>           return true;
> >>> diff --git a/net/tipc/msg.c b/net/tipc/msg.c
> >>> index 17201aa..f4e6197 100644
> >>> --- a/net/tipc/msg.c
> >>> +++ b/net/tipc/msg.c
> >>> @@ -607,6 +607,23 @@ bool tipc_msg_reassemble(struct sk_buff_head
> *list, struct sk_buff_head *rcvq)
> >>>   return false;
> >>>  }
> >>>
> >>> +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
> >>> +                 struct sk_buff_head *cpy)
> >>> +{
> >>> + struct sk_buff *skb, *_skb;
> >>> +
> >>> + skb_queue_walk(msg, skb) {
> >>> +         _skb = pskb_copy(skb, GFP_ATOMIC);
> >>> +         if (!_skb) {
> >>> +                 __skb_queue_purge(cpy);
> >>> +                 return false;
> >>> +         }
> >>> +         msg_set_destnode(buf_msg(_skb), dst);
> >>> +         __skb_queue_tail(cpy, _skb);
> >>> + }
> >>> + return true;
> >>> +}
> >>> +
> >>>  /* tipc_skb_queue_sorted(); sort pkt into list according to sequence
> number
> >>>   * @list: list to be appended to
> >>>   * @seqno: sequence number of buffer to add
> >>> diff --git a/net/tipc/msg.h b/net/tipc/msg.h
> >>> index 850ae0e..295e477 100644
> >>> --- a/net/tipc/msg.h
> >>> +++ b/net/tipc/msg.h
> >>> @@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg
> *m, u32 id)
> >>>
> >>>  static inline u32 msg_link_selector(struct tipc_msg *m)
> >>>  {
> >>> + if (msg_user(m) == MSG_FRAGMENTER)
> >>> +         m = (void *)msg_data(m);
> >>>   return msg_bits(m, 4, 0, 1);
> >>>  }
> >>>
> >>> -static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
> >>> -{
> >>> - msg_set_bits(m, 4, 0, 1, n);
> >>> -}
> >>> -
> >>>  /*
> >>>   * Word 5
> >>>   */
> >>> @@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct
> msghdr *m,
> >>>              int offset, int dsz, int mtu, struct sk_buff_head *list);
> >>>  bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int 
> >>> *err);
> >>>  bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head
> *rcvq);
> >>> +bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
> >>> +                 struct sk_buff_head *cpy);
> >>>  void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
> >>>                        struct sk_buff *skb);
> >>>
> >>> diff --git a/net/tipc/node.c b/net/tipc/node.c
> >>> index 2883f6a..f96dacf 100644
> >>> --- a/net/tipc/node.c
> >>> +++ b/net/tipc/node.c
> >>> @@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct
> sk_buff *skb)
> >>>   kfree_skb(skb);
> >>>  }
> >>>
> >>> +static void tipc_node_mcast_rcv(struct tipc_node *n)
> >>> +{
> >>> + struct tipc_bclink_entry *be = &n->bc_entry;
> >>> +
> >>> + /* 'arrvq' is under inputq2's lock protection */
> >>> + spin_lock_bh(&be->inputq2.lock);
> >>> + spin_lock_bh(&be->inputq1.lock);
> >>> + skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
> >>> + spin_unlock_bh(&be->inputq1.lock);
> >>> + spin_unlock_bh(&be->inputq2.lock);
> >>> + tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
> >>> +}
> >>> +
> >>>  static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg
> *hdr,
> >>>                             int bearer_id, struct sk_buff_head *xmitq)
> >>>  {
> >>> @@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net,
> struct sk_buff *skb, int bearer_id
> >>>   if (!skb_queue_empty(&xmitq))
> >>>           tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
> >>>
> >>> - /* Deliver. 'arrvq' is under inputq2's lock protection */
> >>> - if (!skb_queue_empty(&be->inputq1)) {
> >>> -         spin_lock_bh(&be->inputq2.lock);
> >>> -         spin_lock_bh(&be->inputq1.lock);
> >>> -         skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
> >>> -         spin_unlock_bh(&be->inputq1.lock);
> >>> -         spin_unlock_bh(&be->inputq2.lock);
> >>> -         tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
> >>> - }
> >>> + if (!skb_queue_empty(&be->inputq1))
> >>> +         tipc_node_mcast_rcv(n);
> >>>
> >>>   if (rc & TIPC_LINK_DOWN_EVT) {
> >>>           /* Reception reassembly failure => reset all links to peer */
> >>> @@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb,
> struct tipc_bearer *b)
> >>>   if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
> >>>           tipc_named_rcv(net, &n->bc_entry.namedq);
> >>>
> >>> + if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
> >>> +         tipc_node_mcast_rcv(n);
> >>> +
> >>>   if (!skb_queue_empty(&le->inputq))
> >>>           tipc_sk_rcv(net, &le->inputq);
> >>>
> >>> diff --git a/net/tipc/socket.c b/net/tipc/socket.c
> >>> index b456265..1f948e6 100644
> >>> --- a/net/tipc/socket.c
> >>> +++ b/net/tipc/socket.c
> >>> @@ -740,32 +740,43 @@ static int tipc_sendmcast(struct  socket *sock,
> struct tipc_name_seq *seq,
> >>>   struct tipc_msg *hdr = &tsk->phdr;
> >>>   struct net *net = sock_net(sk);
> >>>   int mtu = tipc_bcast_get_mtu(net);
> >>> + u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
> >>>   struct sk_buff_head pkts;
> >>> + struct tipc_nlist dsts;
> >>>   int rc;
> >>>
> >>> + /* Block or return if any destination link is congested */
> >>>   rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
> >>>   if (unlikely(rc))
> >>>           return rc;
> >>>
> >>> + /* Lookup destination nodes */
> >>> + tipc_nlist_init(&dsts, tipc_own_addr(net));
> >>> + tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
> >>> +                               seq->upper, domain, &dsts);
> >>> + if (!dsts.local && !dsts.remote)
> >>> +         return -EHOSTUNREACH;
> >>> +
> >>> + /* Build message header */
> >>>   msg_set_type(hdr, TIPC_MCAST_MSG);
> >>> + msg_set_hdr_sz(hdr, MCAST_H_SIZE);
> >>>   msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
> >>>   msg_set_destport(hdr, 0);
> >>>   msg_set_destnode(hdr, 0);
> >>>   msg_set_nametype(hdr, seq->type);
> >>>   msg_set_namelower(hdr, seq->lower);
> >>>   msg_set_nameupper(hdr, seq->upper);
> >>> - msg_set_hdr_sz(hdr, MCAST_H_SIZE);
> >>>
> >>> + /* Build message as chain of buffers */
> >>>   skb_queue_head_init(&pkts);
> >>>   rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
> >>> - if (unlikely(rc != dlen))
> > if(likely(rc == dlen))
> >>> -         return rc;
> >>>
> >>> - rc = tipc_bcast_xmit(net, &pkts);
> >>> - if (unlikely(rc == -ELINKCONG)) {
> >>> -         tsk->cong_link_cnt = 1;
> >>> -         rc = 0;
> >>> - }
> >>> + /* Send message if build was successful */
> >>> + if (unlikely(rc == dlen))
> >>> +         rc = tipc_mcast_xmit(net, &pkts, &dsts,
> >>> +                              &tsk->cong_link_cnt);
> >>> +
> >>> + tipc_nlist_purge(&dsts);
> >>>
> >>>   return rc ? rc : dlen;
> >>>  }
> >>>
> >>
> >> ------------------------------------------------------------------------------
> >> Check out the vibrant tech community on one of the world's most
> >> engaging tech sites, SlashDot.org! http://sdm.link/slashdot
> >> _______________________________________________
> >> tipc-discussion mailing list
> >> tipc-discussion@lists.sourceforge.net
> >> https://lists.sourceforge.net/lists/listinfo/tipc-discussion
> >>
> >
> > ------------------------------------------------------------------------------
> > Check out the vibrant tech community on one of the world's most
> > engaging tech sites, SlashDot.org! http://sdm.link/slashdot
> > _______________________________________________
> > tipc-discussion mailing list
> > tipc-discussion@lists.sourceforge.net
> > https://lists.sourceforge.net/lists/listinfo/tipc-discussion
> >

------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today. http://sdm.link/xeonphi
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to