Forget about previous mail. As I said, I was confused, but I think I understand it now.
///jon > -----Original Message----- > From: Jon Maloy > Sent: Friday, 23 December, 2016 09:40 > To: Jon Maloy <jon.ma...@ericsson.com>; tipc-discussion@lists.sourceforge.net; > Parthasarathy Bhuvaragan <parthasarathy.bhuvara...@ericsson.com>; Ying Xue > <ying....@windriver.com> > Cc: ma...@donjonn.com; erik.hu...@gmail.com > Subject: RE: [net-next v4 3/3] tipc: reduce risk of user starvation during > link > congestion > > See below. > ///jon > > > -----Original Message----- > > From: Jon Maloy [mailto:jon.ma...@ericsson.com] > > Sent: Thursday, 22 December, 2016 09:51 > > To: tipc-discussion@lists.sourceforge.net; Parthasarathy Bhuvaragan > > <parthasarathy.bhuvara...@ericsson.com>; Ying Xue > > [...] > > > } > > > > -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t > > dsz) > > +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t > > dlen) > > { > > - DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); > > struct sock *sk = sock->sk; > > - struct tipc_sock *tsk = tipc_sk(sk); > > struct net *net = sock_net(sk); > > - struct tipc_msg *mhdr = &tsk->phdr; > > - u32 dnode, dport; > > - struct sk_buff_head pktchain; > > - bool is_connectionless = tipc_sk_type_connectionless(sk); > > - struct sk_buff *skb; > > + struct tipc_sock *tsk = tipc_sk(sk); > > + DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); > > + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); > > + struct list_head *clinks = &tsk->cong_links; > > + bool syn = !tipc_sk_type_connectionless(sk); > > + struct tipc_msg *hdr = &tsk->phdr; > > struct tipc_name_seq *seq; > > - struct iov_iter save; > > - u32 mtu; > > - long timeo; > > - int rc; > > + struct sk_buff_head pkts; > > + u32 type, inst, domain; > > + u32 dnode, dport; > > + int mtu, rc; > > > > - if (dsz > TIPC_MAX_USER_MSG_SIZE) > > + if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) > > return -EMSGSIZE; > > + > > if (unlikely(!dest)) { > > - if (is_connectionless && tsk->peer.family == AF_TIPC) > > - dest = &tsk->peer; > > - else > > + dest = &tsk->peer; > > + if (!syn || dest->family != AF_TIPC) > > return -EDESTADDRREQ; > > - } else if (unlikely(m->msg_namelen < sizeof(*dest)) || > > - dest->family != AF_TIPC) { > > - return -EINVAL; > > } > > - if (!is_connectionless) { > > + > > + if (unlikely(m->msg_namelen < sizeof(*dest))) > > + return -EINVAL; > > + > > + if (unlikely(dest->family != AF_TIPC)) > > + return -EINVAL; > > + > > + if (unlikely(syn)) { > > This one confuses me. I cannot think of any setup scenario where a SYN would > be > sent this way. But I believe that Erik at some moment added ability to do > connect() on datagram sockets, i.e. similar to the way you can do connect() on > UDP. Is this it? If, so, is it correct to test for !is_connectionless() as we > do? > Brief, does anybody understand this part of the code ? > > ///jon > > > > if (sk->sk_state == TIPC_LISTEN) > > return -EPIPE; > > if (sk->sk_state != TIPC_OPEN) > > @@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct > > msghdr *m, size_t dsz) > > tsk->conn_instance = dest->addr.name.name.instance; > > } > > } > > - seq = &dest->addr.nameseq; > > - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); > > > > - if (dest->addrtype == TIPC_ADDR_MCAST) { > > - return tipc_sendmcast(sock, seq, m, dsz, timeo); > > - } else if (dest->addrtype == TIPC_ADDR_NAME) { > > - u32 type = dest->addr.name.name.type; > > - u32 inst = dest->addr.name.name.instance; > > - u32 domain = dest->addr.name.domain; > > + seq = &dest->addr.nameseq; > > + if (dest->addrtype == TIPC_ADDR_MCAST) > > + return tipc_sendmcast(sock, seq, m, dlen, timeout); > > > > + if (dest->addrtype == TIPC_ADDR_NAME) { > > + type = dest->addr.name.name.type; > > + inst = dest->addr.name.name.instance; > > + domain = dest->addr.name.domain; > > dnode = domain; > > - msg_set_type(mhdr, TIPC_NAMED_MSG); > > - msg_set_hdr_sz(mhdr, NAMED_H_SIZE); > > - msg_set_nametype(mhdr, type); > > - msg_set_nameinst(mhdr, inst); > > - msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); > > + msg_set_type(hdr, TIPC_NAMED_MSG); > > + msg_set_hdr_sz(hdr, NAMED_H_SIZE); > > + msg_set_nametype(hdr, type); > > + msg_set_nameinst(hdr, inst); > > + msg_set_lookup_scope(hdr, tipc_addr_scope(domain)); > > dport = tipc_nametbl_translate(net, type, inst, &dnode); > > - msg_set_destnode(mhdr, dnode); > > - msg_set_destport(mhdr, dport); > > + msg_set_destnode(hdr, dnode); > > + msg_set_destport(hdr, dport); > > if (unlikely(!dport && !dnode)) > > return -EHOSTUNREACH; > > + > > } else if (dest->addrtype == TIPC_ADDR_ID) { > > dnode = dest->addr.id.node; > > - msg_set_type(mhdr, TIPC_DIRECT_MSG); > > - msg_set_lookup_scope(mhdr, 0); > > - msg_set_destnode(mhdr, dnode); > > - msg_set_destport(mhdr, dest->addr.id.ref); > > - msg_set_hdr_sz(mhdr, BASIC_H_SIZE); > > + msg_set_type(hdr, TIPC_DIRECT_MSG); > > + msg_set_lookup_scope(hdr, 0); > > + msg_set_destnode(hdr, dnode); > > + msg_set_destport(hdr, dest->addr.id.ref); > > + msg_set_hdr_sz(hdr, BASIC_H_SIZE); > > } > > > > - skb_queue_head_init(&pktchain); > > - save = m->msg_iter; > > -new_mtu: > > + /* Block or return if destination link is congested */ > > + rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); > > + if (unlikely(rc)) > > + return rc; > > + > > + skb_queue_head_init(&pkts); > > mtu = tipc_node_get_mtu(net, dnode, tsk->portid); > > - rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); > > - if (rc < 0) > > + rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts); > > + if (unlikely(rc != dlen)) > > return rc; > > > > - do { > > - skb = skb_peek(&pktchain); > > - TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; > > - rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); > > - if (likely(!rc)) { > > - if (!is_connectionless) > > - tipc_set_sk_state(sk, TIPC_CONNECTING); > > - return dsz; > > - } > > - if (rc == -ELINKCONG) { > > - tsk->link_cong = 1; > > - rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong); > > - if (!rc) > > - continue; > > - } > > - __skb_queue_purge(&pktchain); > > - if (rc == -EMSGSIZE) { > > - m->msg_iter = save; > > - goto new_mtu; > > - } > > - break; > > - } while (1); > > + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); > > + if (unlikely(rc == -ELINKCONG)) { > > + u32_push(clinks, dnode); > > + tsk->cong_link_cnt++; > > + rc = 0; > > + } > > > > - return rc; > > + if (unlikely(syn && !rc)) > > + tipc_set_sk_state(sk, TIPC_CONNECTING); > > + > > + return rc ? rc : dlen; > > } > > > > /** > > - * tipc_send_stream - send stream-oriented data > > + * tipc_sendstream - send stream-oriented data > > * @sock: socket structure > > * @m: data to send > > * @dsz: total length of data to be transmitted > > @@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct > > msghdr *m, size_t dsz) > > * Returns the number of bytes sent on success (or partial success), > > * or errno if no data sent > > */ > > -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t > > dsz) > > +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t > > dsz) > > { > > struct sock *sk = sock->sk; > > int ret; > > > > lock_sock(sk); > > - ret = __tipc_send_stream(sock, m, dsz); > > + ret = __tipc_sendstream(sock, m, dsz); > > release_sock(sk); > > > > return ret; > > } > > > > -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t > dsz) > > +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t > dlen) > > { > > struct sock *sk = sock->sk; > > - struct net *net = sock_net(sk); > > - struct tipc_sock *tsk = tipc_sk(sk); > > - struct tipc_msg *mhdr = &tsk->phdr; > > - struct sk_buff_head pktchain; > > DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); > > - u32 portid = tsk->portid; > > - int rc = -EINVAL; > > - long timeo; > > - u32 dnode; > > - uint mtu, send, sent = 0; > > - struct iov_iter save; > > - int hlen = MIN_H_SIZE; > > - > > - /* Handle implied connection establishment */ > > - if (unlikely(dest)) { > > - rc = __tipc_sendmsg(sock, m, dsz); > > - hlen = msg_hdr_sz(mhdr); > > - if (dsz && (dsz == rc)) > > - tsk->snt_unacked = tsk_inc(tsk, dsz + hlen); > > - return rc; > > - } > > - if (dsz > (uint)INT_MAX) > > - return -EMSGSIZE; > > - > > - if (unlikely(!tipc_sk_connected(sk))) { > > - if (sk->sk_state == TIPC_DISCONNECTING) > > - return -EPIPE; > > - else > > - return -ENOTCONN; > > - } > > + long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); > > + struct tipc_sock *tsk = tipc_sk(sk); > > + struct tipc_msg *hdr = &tsk->phdr; > > + struct net *net = sock_net(sk); > > + struct sk_buff_head pkts; > > + u32 dnode = tsk_peer_node(tsk); > > + int send, sent = 0; > > + int rc = 0; > > > > - timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); > > - if (!timeo && tsk->link_cong) > > - return -ELINKCONG; > > + skb_queue_head_init(&pkts); > > > > - dnode = tsk_peer_node(tsk); > > - skb_queue_head_init(&pktchain); > > + if (unlikely(dlen > INT_MAX)) > > + return -EMSGSIZE; > > > > -next: > > - save = m->msg_iter; > > - mtu = tsk->max_pkt; > > - send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); > > - rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); > > - if (unlikely(rc < 0)) > > + /* Handle implicit connection setup */ > > + if (unlikely(dest)) { > > + rc = __tipc_sendmsg(sock, m, dlen); > > + if (dlen && (dlen == rc)) > > + tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr)); > > return rc; > > + } > > > > do { > > - if (likely(!tsk_conn_cong(tsk))) { > > - rc = tipc_node_xmit(net, &pktchain, dnode, portid); > > - if (likely(!rc)) { > > - tsk->snt_unacked += tsk_inc(tsk, send + hlen); > > - sent += send; > > - if (sent == dsz) > > - return dsz; > > - goto next; > > - } > > - if (rc == -EMSGSIZE) { > > - __skb_queue_purge(&pktchain); > > - tsk->max_pkt = tipc_node_get_mtu(net, dnode, > > - portid); > > - m->msg_iter = save; > > - goto next; > > - } > > - if (rc != -ELINKCONG) > > - break; > > - > > - tsk->link_cong = 1; > > - } > > - rc = tipc_wait_for_cond(sock, &timeo, > > - (!tsk->link_cong && > > + rc = tipc_wait_for_cond(sock, &timeout, > > + (!tsk->cong_link_cnt && > > !tsk_conn_cong(tsk) && > > tipc_sk_connected(sk))); > > - } while (!rc); > > + if (unlikely(rc)) > > + break; > > + > > + send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE); > > + rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts); > > + if (unlikely(rc != send)) > > + break; > > + > > + rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); > > + if (unlikely(rc == -ELINKCONG)) { > > + tsk->cong_link_cnt = 1; > > + rc = 0; > > + } > > + if (likely(!rc)) { > > + tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE); > > + sent += send; > > + } > > + } while (sent < dlen && !rc); > > > > - __skb_queue_purge(&pktchain); > > - return sent ? sent : rc; > > + return rc ? rc : sent; > > } > > > > /** > > @@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, > > struct > > msghdr *m, size_t dsz) > > if (dsz > TIPC_MAX_USER_MSG_SIZE) > > return -EMSGSIZE; > > > > - return tipc_send_stream(sock, m, dsz); > > + return tipc_sendstream(sock, m, dsz); > > } > > > > /* tipc_sk_finish_conn - complete the setup of a connection > > @@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff > > *skb, > > unsigned int limit = rcvbuf_limit(sk, skb); > > int err = TIPC_OK; > > int usr = msg_user(hdr); > > + u32 onode; > > > > if (unlikely(msg_user(hdr) == CONN_MANAGER)) { > > tipc_sk_proto_rcv(tsk, skb, xmitq); > > @@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct > > sk_buff > > *skb, > > } > > > > if (unlikely(usr == SOCK_WAKEUP)) { > > + onode = msg_orignode(hdr); > > kfree_skb(skb); > > - tsk->link_cong = 0; > > + u32_del(&tsk->cong_links, onode); > > + tsk->cong_link_cnt--; > > sk->sk_write_space(sk); > > return false; > > } > > @@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct > > socket > > *new_sock, int flags) > > struct msghdr m = {NULL,}; > > > > tsk_advance_rx_queue(sk); > > - __tipc_send_stream(new_sock, &m, 0); > > + __tipc_sendstream(new_sock, &m, 0); > > } else { > > __skb_dequeue(&sk->sk_receive_queue); > > __skb_queue_head(&new_sk->sk_receive_queue, buf); > > @@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = { > > .shutdown = tipc_shutdown, > > .setsockopt = tipc_setsockopt, > > .getsockopt = tipc_getsockopt, > > - .sendmsg = tipc_send_stream, > > + .sendmsg = tipc_sendstream, > > .recvmsg = tipc_recv_stream, > > .mmap = sock_no_mmap, > > .sendpage = sock_no_sendpage > > -- > > 2.7.4 ------------------------------------------------------------------------------ Developer Access Program for Intel Xeon Phi Processors Access to Intel Xeon Phi processor-based developer platforms. With one year of Intel Parallel Studio XE. Training and support from Colfax. Order your platform today.http://sdm.link/intel _______________________________________________ tipc-discussion mailing list tipc-discussion@lists.sourceforge.net https://lists.sourceforge.net/lists/listinfo/tipc-discussion