Forget about previous mail. As I said, I was confused, but I think I understand 
it now.

///jon


> -----Original Message-----
> From: Jon Maloy
> Sent: Friday, 23 December, 2016 09:40
> To: Jon Maloy <jon.ma...@ericsson.com>; tipc-discussion@lists.sourceforge.net;
> Parthasarathy Bhuvaragan <parthasarathy.bhuvara...@ericsson.com>; Ying Xue
> <ying....@windriver.com>
> Cc: ma...@donjonn.com; erik.hu...@gmail.com
> Subject: RE: [net-next v4 3/3] tipc: reduce risk of user starvation during 
> link
> congestion
> 
> See below.
> ///jon
> 
> > -----Original Message-----
> > From: Jon Maloy [mailto:jon.ma...@ericsson.com]
> > Sent: Thursday, 22 December, 2016 09:51
> > To: tipc-discussion@lists.sourceforge.net; Parthasarathy Bhuvaragan
> > <parthasarathy.bhuvara...@ericsson.com>; Ying Xue
> 
> [...]
> 
> >  }
> >
> > -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t 
> > dsz)
> > +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t 
> > dlen)
> >  {
> > -   DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
> >     struct sock *sk = sock->sk;
> > -   struct tipc_sock *tsk = tipc_sk(sk);
> >     struct net *net = sock_net(sk);
> > -   struct tipc_msg *mhdr = &tsk->phdr;
> > -   u32 dnode, dport;
> > -   struct sk_buff_head pktchain;
> > -   bool is_connectionless = tipc_sk_type_connectionless(sk);
> > -   struct sk_buff *skb;
> > +   struct tipc_sock *tsk = tipc_sk(sk);
> > +   DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
> > +   long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> > +   struct list_head *clinks = &tsk->cong_links;
> > +   bool syn = !tipc_sk_type_connectionless(sk);
> > +   struct tipc_msg *hdr = &tsk->phdr;
> >     struct tipc_name_seq *seq;
> > -   struct iov_iter save;
> > -   u32 mtu;
> > -   long timeo;
> > -   int rc;
> > +   struct sk_buff_head pkts;
> > +   u32 type, inst, domain;
> > +   u32 dnode, dport;
> > +   int mtu, rc;
> >
> > -   if (dsz > TIPC_MAX_USER_MSG_SIZE)
> > +   if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
> >             return -EMSGSIZE;
> > +
> >     if (unlikely(!dest)) {
> > -           if (is_connectionless && tsk->peer.family == AF_TIPC)
> > -                   dest = &tsk->peer;
> > -           else
> > +           dest = &tsk->peer;
> > +           if (!syn || dest->family != AF_TIPC)
> >                     return -EDESTADDRREQ;
> > -   } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
> > -              dest->family != AF_TIPC) {
> > -           return -EINVAL;
> >     }
> > -   if (!is_connectionless) {
> > +
> > +   if (unlikely(m->msg_namelen < sizeof(*dest)))
> > +           return -EINVAL;
> > +
> > +   if (unlikely(dest->family != AF_TIPC))
> > +           return -EINVAL;
> > +
> > +   if (unlikely(syn)) {
> 
> This one confuses me. I cannot think of any setup scenario where a SYN would 
> be
> sent this way. But I believe that Erik at some moment added ability to do
> connect() on datagram sockets, i.e. similar to the way you can do connect() on
> UDP. Is this it? If, so, is it correct to test for !is_connectionless() as we 
> do?
> Brief, does anybody understand this part of the code ?
> 
> ///jon
> 
> 
> >             if (sk->sk_state == TIPC_LISTEN)
> >                     return -EPIPE;
> >             if (sk->sk_state != TIPC_OPEN)
> > @@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct
> > msghdr *m, size_t dsz)
> >                     tsk->conn_instance = dest->addr.name.name.instance;
> >             }
> >     }
> > -   seq = &dest->addr.nameseq;
> > -   timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> >
> > -   if (dest->addrtype == TIPC_ADDR_MCAST) {
> > -           return tipc_sendmcast(sock, seq, m, dsz, timeo);
> > -   } else if (dest->addrtype == TIPC_ADDR_NAME) {
> > -           u32 type = dest->addr.name.name.type;
> > -           u32 inst = dest->addr.name.name.instance;
> > -           u32 domain = dest->addr.name.domain;
> > +   seq = &dest->addr.nameseq;
> > +   if (dest->addrtype == TIPC_ADDR_MCAST)
> > +           return tipc_sendmcast(sock, seq, m, dlen, timeout);
> >
> > +   if (dest->addrtype == TIPC_ADDR_NAME) {
> > +           type = dest->addr.name.name.type;
> > +           inst = dest->addr.name.name.instance;
> > +           domain = dest->addr.name.domain;
> >             dnode = domain;
> > -           msg_set_type(mhdr, TIPC_NAMED_MSG);
> > -           msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
> > -           msg_set_nametype(mhdr, type);
> > -           msg_set_nameinst(mhdr, inst);
> > -           msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
> > +           msg_set_type(hdr, TIPC_NAMED_MSG);
> > +           msg_set_hdr_sz(hdr, NAMED_H_SIZE);
> > +           msg_set_nametype(hdr, type);
> > +           msg_set_nameinst(hdr, inst);
> > +           msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
> >             dport = tipc_nametbl_translate(net, type, inst, &dnode);
> > -           msg_set_destnode(mhdr, dnode);
> > -           msg_set_destport(mhdr, dport);
> > +           msg_set_destnode(hdr, dnode);
> > +           msg_set_destport(hdr, dport);
> >             if (unlikely(!dport && !dnode))
> >                     return -EHOSTUNREACH;
> > +
> >     } else if (dest->addrtype == TIPC_ADDR_ID) {
> >             dnode = dest->addr.id.node;
> > -           msg_set_type(mhdr, TIPC_DIRECT_MSG);
> > -           msg_set_lookup_scope(mhdr, 0);
> > -           msg_set_destnode(mhdr, dnode);
> > -           msg_set_destport(mhdr, dest->addr.id.ref);
> > -           msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
> > +           msg_set_type(hdr, TIPC_DIRECT_MSG);
> > +           msg_set_lookup_scope(hdr, 0);
> > +           msg_set_destnode(hdr, dnode);
> > +           msg_set_destport(hdr, dest->addr.id.ref);
> > +           msg_set_hdr_sz(hdr, BASIC_H_SIZE);
> >     }
> >
> > -   skb_queue_head_init(&pktchain);
> > -   save = m->msg_iter;
> > -new_mtu:
> > +   /* Block or return if destination link is congested */
> > +   rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
> > +   if (unlikely(rc))
> > +           return rc;
> > +
> > +   skb_queue_head_init(&pkts);
> >     mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
> > -   rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
> > -   if (rc < 0)
> > +   rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
> > +   if (unlikely(rc != dlen))
> >             return rc;
> >
> > -   do {
> > -           skb = skb_peek(&pktchain);
> > -           TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
> > -           rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
> > -           if (likely(!rc)) {
> > -                   if (!is_connectionless)
> > -                           tipc_set_sk_state(sk, TIPC_CONNECTING);
> > -                   return dsz;
> > -           }
> > -           if (rc == -ELINKCONG) {
> > -                   tsk->link_cong = 1;
> > -                   rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
> > -                   if (!rc)
> > -                           continue;
> > -           }
> > -           __skb_queue_purge(&pktchain);
> > -           if (rc == -EMSGSIZE) {
> > -                   m->msg_iter = save;
> > -                   goto new_mtu;
> > -           }
> > -           break;
> > -   } while (1);
> > +   rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
> > +   if (unlikely(rc == -ELINKCONG)) {
> > +           u32_push(clinks, dnode);
> > +           tsk->cong_link_cnt++;
> > +           rc = 0;
> > +   }
> >
> > -   return rc;
> > +   if (unlikely(syn && !rc))
> > +           tipc_set_sk_state(sk, TIPC_CONNECTING);
> > +
> > +   return rc ? rc : dlen;
> >  }
> >
> >  /**
> > - * tipc_send_stream - send stream-oriented data
> > + * tipc_sendstream - send stream-oriented data
> >   * @sock: socket structure
> >   * @m: data to send
> >   * @dsz: total length of data to be transmitted
> > @@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct
> > msghdr *m, size_t dsz)
> >   * Returns the number of bytes sent on success (or partial success),
> >   * or errno if no data sent
> >   */
> > -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t 
> > dsz)
> > +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t 
> > dsz)
> >  {
> >     struct sock *sk = sock->sk;
> >     int ret;
> >
> >     lock_sock(sk);
> > -   ret = __tipc_send_stream(sock, m, dsz);
> > +   ret = __tipc_sendstream(sock, m, dsz);
> >     release_sock(sk);
> >
> >     return ret;
> >  }
> >
> > -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t
> dsz)
> > +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t
> dlen)
> >  {
> >     struct sock *sk = sock->sk;
> > -   struct net *net = sock_net(sk);
> > -   struct tipc_sock *tsk = tipc_sk(sk);
> > -   struct tipc_msg *mhdr = &tsk->phdr;
> > -   struct sk_buff_head pktchain;
> >     DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
> > -   u32 portid = tsk->portid;
> > -   int rc = -EINVAL;
> > -   long timeo;
> > -   u32 dnode;
> > -   uint mtu, send, sent = 0;
> > -   struct iov_iter save;
> > -   int hlen = MIN_H_SIZE;
> > -
> > -   /* Handle implied connection establishment */
> > -   if (unlikely(dest)) {
> > -           rc = __tipc_sendmsg(sock, m, dsz);
> > -           hlen = msg_hdr_sz(mhdr);
> > -           if (dsz && (dsz == rc))
> > -                   tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
> > -           return rc;
> > -   }
> > -   if (dsz > (uint)INT_MAX)
> > -           return -EMSGSIZE;
> > -
> > -   if (unlikely(!tipc_sk_connected(sk))) {
> > -           if (sk->sk_state == TIPC_DISCONNECTING)
> > -                   return -EPIPE;
> > -           else
> > -                   return -ENOTCONN;
> > -   }
> > +   long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> > +   struct tipc_sock *tsk = tipc_sk(sk);
> > +   struct tipc_msg *hdr = &tsk->phdr;
> > +   struct net *net = sock_net(sk);
> > +   struct sk_buff_head pkts;
> > +   u32 dnode = tsk_peer_node(tsk);
> > +   int send, sent = 0;
> > +   int rc = 0;
> >
> > -   timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> > -   if (!timeo && tsk->link_cong)
> > -           return -ELINKCONG;
> > +   skb_queue_head_init(&pkts);
> >
> > -   dnode = tsk_peer_node(tsk);
> > -   skb_queue_head_init(&pktchain);
> > +   if (unlikely(dlen > INT_MAX))
> > +           return -EMSGSIZE;
> >
> > -next:
> > -   save = m->msg_iter;
> > -   mtu = tsk->max_pkt;
> > -   send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
> > -   rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
> > -   if (unlikely(rc < 0))
> > +   /* Handle implicit connection setup */
> > +   if (unlikely(dest)) {
> > +           rc = __tipc_sendmsg(sock, m, dlen);
> > +           if (dlen && (dlen == rc))
> > +                   tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
> >             return rc;
> > +   }
> >
> >     do {
> > -           if (likely(!tsk_conn_cong(tsk))) {
> > -                   rc = tipc_node_xmit(net, &pktchain, dnode, portid);
> > -                   if (likely(!rc)) {
> > -                           tsk->snt_unacked += tsk_inc(tsk, send + hlen);
> > -                           sent += send;
> > -                           if (sent == dsz)
> > -                                   return dsz;
> > -                           goto next;
> > -                   }
> > -                   if (rc == -EMSGSIZE) {
> > -                           __skb_queue_purge(&pktchain);
> > -                           tsk->max_pkt = tipc_node_get_mtu(net, dnode,
> > -                                                            portid);
> > -                           m->msg_iter = save;
> > -                           goto next;
> > -                   }
> > -                   if (rc != -ELINKCONG)
> > -                           break;
> > -
> > -                   tsk->link_cong = 1;
> > -           }
> > -           rc = tipc_wait_for_cond(sock, &timeo,
> > -                                   (!tsk->link_cong &&
> > +           rc = tipc_wait_for_cond(sock, &timeout,
> > +                                   (!tsk->cong_link_cnt &&
> >                                      !tsk_conn_cong(tsk) &&
> >                                      tipc_sk_connected(sk)));
> > -   } while (!rc);
> > +           if (unlikely(rc))
> > +                   break;
> > +
> > +           send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
> > +           rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
> > +           if (unlikely(rc != send))
> > +                   break;
> > +
> > +           rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
> > +           if (unlikely(rc == -ELINKCONG)) {
> > +                   tsk->cong_link_cnt = 1;
> > +                   rc = 0;
> > +           }
> > +           if (likely(!rc)) {
> > +                   tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
> > +                   sent += send;
> > +           }
> > +   } while (sent < dlen && !rc);
> >
> > -   __skb_queue_purge(&pktchain);
> > -   return sent ? sent : rc;
> > +   return rc ? rc : sent;
> >  }
> >
> >  /**
> > @@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, 
> > struct
> > msghdr *m, size_t dsz)
> >     if (dsz > TIPC_MAX_USER_MSG_SIZE)
> >             return -EMSGSIZE;
> >
> > -   return tipc_send_stream(sock, m, dsz);
> > +   return tipc_sendstream(sock, m, dsz);
> >  }
> >
> >  /* tipc_sk_finish_conn - complete the setup of a connection
> > @@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff
> > *skb,
> >     unsigned int limit = rcvbuf_limit(sk, skb);
> >     int err = TIPC_OK;
> >     int usr = msg_user(hdr);
> > +   u32 onode;
> >
> >     if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
> >             tipc_sk_proto_rcv(tsk, skb, xmitq);
> > @@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct 
> > sk_buff
> > *skb,
> >     }
> >
> >     if (unlikely(usr == SOCK_WAKEUP)) {
> > +           onode = msg_orignode(hdr);
> >             kfree_skb(skb);
> > -           tsk->link_cong = 0;
> > +           u32_del(&tsk->cong_links, onode);
> > +           tsk->cong_link_cnt--;
> >             sk->sk_write_space(sk);
> >             return false;
> >     }
> > @@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct 
> > socket
> > *new_sock, int flags)
> >             struct msghdr m = {NULL,};
> >
> >             tsk_advance_rx_queue(sk);
> > -           __tipc_send_stream(new_sock, &m, 0);
> > +           __tipc_sendstream(new_sock, &m, 0);
> >     } else {
> >             __skb_dequeue(&sk->sk_receive_queue);
> >             __skb_queue_head(&new_sk->sk_receive_queue, buf);
> > @@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = {
> >     .shutdown       = tipc_shutdown,
> >     .setsockopt     = tipc_setsockopt,
> >     .getsockopt     = tipc_getsockopt,
> > -   .sendmsg        = tipc_send_stream,
> > +   .sendmsg        = tipc_sendstream,
> >     .recvmsg        = tipc_recv_stream,
> >     .mmap           = sock_no_mmap,
> >     .sendpage       = sock_no_sendpage
> > --
> > 2.7.4


------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today.http://sdm.link/intel
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to