See below.
///jon

> -----Original Message-----
> From: Jon Maloy [mailto:jon.ma...@ericsson.com]
> Sent: Thursday, 22 December, 2016 09:51
> To: tipc-discussion@lists.sourceforge.net; Parthasarathy Bhuvaragan
> <parthasarathy.bhuvara...@ericsson.com>; Ying Xue

[...]

>  }
> 
> -static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
> +static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
>  {
> -     DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
>       struct sock *sk = sock->sk;
> -     struct tipc_sock *tsk = tipc_sk(sk);
>       struct net *net = sock_net(sk);
> -     struct tipc_msg *mhdr = &tsk->phdr;
> -     u32 dnode, dport;
> -     struct sk_buff_head pktchain;
> -     bool is_connectionless = tipc_sk_type_connectionless(sk);
> -     struct sk_buff *skb;
> +     struct tipc_sock *tsk = tipc_sk(sk);
> +     DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
> +     long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> +     struct list_head *clinks = &tsk->cong_links;
> +     bool syn = !tipc_sk_type_connectionless(sk);
> +     struct tipc_msg *hdr = &tsk->phdr;
>       struct tipc_name_seq *seq;
> -     struct iov_iter save;
> -     u32 mtu;
> -     long timeo;
> -     int rc;
> +     struct sk_buff_head pkts;
> +     u32 type, inst, domain;
> +     u32 dnode, dport;
> +     int mtu, rc;
> 
> -     if (dsz > TIPC_MAX_USER_MSG_SIZE)
> +     if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
>               return -EMSGSIZE;
> +
>       if (unlikely(!dest)) {
> -             if (is_connectionless && tsk->peer.family == AF_TIPC)
> -                     dest = &tsk->peer;
> -             else
> +             dest = &tsk->peer;
> +             if (!syn || dest->family != AF_TIPC)
>                       return -EDESTADDRREQ;
> -     } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
> -                dest->family != AF_TIPC) {
> -             return -EINVAL;
>       }
> -     if (!is_connectionless) {
> +
> +     if (unlikely(m->msg_namelen < sizeof(*dest)))
> +             return -EINVAL;
> +
> +     if (unlikely(dest->family != AF_TIPC))
> +             return -EINVAL;
> +
> +     if (unlikely(syn)) {

This one confuses me. I cannot think of any setup scenario where a SYN would be 
sent this way. But I believe that Erik at some moment added ability to do 
connect() on datagram sockets, i.e. similar to the way you can do connect() on 
UDP. Is this it? If, so, is it correct to test for !is_connectionless() as we 
do?
Brief, does anybody understand this part of the code ?

///jon


>               if (sk->sk_state == TIPC_LISTEN)
>                       return -EPIPE;
>               if (sk->sk_state != TIPC_OPEN)
> @@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct
> msghdr *m, size_t dsz)
>                       tsk->conn_instance = dest->addr.name.name.instance;
>               }
>       }
> -     seq = &dest->addr.nameseq;
> -     timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> 
> -     if (dest->addrtype == TIPC_ADDR_MCAST) {
> -             return tipc_sendmcast(sock, seq, m, dsz, timeo);
> -     } else if (dest->addrtype == TIPC_ADDR_NAME) {
> -             u32 type = dest->addr.name.name.type;
> -             u32 inst = dest->addr.name.name.instance;
> -             u32 domain = dest->addr.name.domain;
> +     seq = &dest->addr.nameseq;
> +     if (dest->addrtype == TIPC_ADDR_MCAST)
> +             return tipc_sendmcast(sock, seq, m, dlen, timeout);
> 
> +     if (dest->addrtype == TIPC_ADDR_NAME) {
> +             type = dest->addr.name.name.type;
> +             inst = dest->addr.name.name.instance;
> +             domain = dest->addr.name.domain;
>               dnode = domain;
> -             msg_set_type(mhdr, TIPC_NAMED_MSG);
> -             msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
> -             msg_set_nametype(mhdr, type);
> -             msg_set_nameinst(mhdr, inst);
> -             msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
> +             msg_set_type(hdr, TIPC_NAMED_MSG);
> +             msg_set_hdr_sz(hdr, NAMED_H_SIZE);
> +             msg_set_nametype(hdr, type);
> +             msg_set_nameinst(hdr, inst);
> +             msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
>               dport = tipc_nametbl_translate(net, type, inst, &dnode);
> -             msg_set_destnode(mhdr, dnode);
> -             msg_set_destport(mhdr, dport);
> +             msg_set_destnode(hdr, dnode);
> +             msg_set_destport(hdr, dport);
>               if (unlikely(!dport && !dnode))
>                       return -EHOSTUNREACH;
> +
>       } else if (dest->addrtype == TIPC_ADDR_ID) {
>               dnode = dest->addr.id.node;
> -             msg_set_type(mhdr, TIPC_DIRECT_MSG);
> -             msg_set_lookup_scope(mhdr, 0);
> -             msg_set_destnode(mhdr, dnode);
> -             msg_set_destport(mhdr, dest->addr.id.ref);
> -             msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
> +             msg_set_type(hdr, TIPC_DIRECT_MSG);
> +             msg_set_lookup_scope(hdr, 0);
> +             msg_set_destnode(hdr, dnode);
> +             msg_set_destport(hdr, dest->addr.id.ref);
> +             msg_set_hdr_sz(hdr, BASIC_H_SIZE);
>       }
> 
> -     skb_queue_head_init(&pktchain);
> -     save = m->msg_iter;
> -new_mtu:
> +     /* Block or return if destination link is congested */
> +     rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
> +     if (unlikely(rc))
> +             return rc;
> +
> +     skb_queue_head_init(&pkts);
>       mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
> -     rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
> -     if (rc < 0)
> +     rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
> +     if (unlikely(rc != dlen))
>               return rc;
> 
> -     do {
> -             skb = skb_peek(&pktchain);
> -             TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
> -             rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
> -             if (likely(!rc)) {
> -                     if (!is_connectionless)
> -                             tipc_set_sk_state(sk, TIPC_CONNECTING);
> -                     return dsz;
> -             }
> -             if (rc == -ELINKCONG) {
> -                     tsk->link_cong = 1;
> -                     rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
> -                     if (!rc)
> -                             continue;
> -             }
> -             __skb_queue_purge(&pktchain);
> -             if (rc == -EMSGSIZE) {
> -                     m->msg_iter = save;
> -                     goto new_mtu;
> -             }
> -             break;
> -     } while (1);
> +     rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
> +     if (unlikely(rc == -ELINKCONG)) {
> +             u32_push(clinks, dnode);
> +             tsk->cong_link_cnt++;
> +             rc = 0;
> +     }
> 
> -     return rc;
> +     if (unlikely(syn && !rc))
> +             tipc_set_sk_state(sk, TIPC_CONNECTING);
> +
> +     return rc ? rc : dlen;
>  }
> 
>  /**
> - * tipc_send_stream - send stream-oriented data
> + * tipc_sendstream - send stream-oriented data
>   * @sock: socket structure
>   * @m: data to send
>   * @dsz: total length of data to be transmitted
> @@ -1013,97 +999,69 @@ static int __tipc_sendmsg(struct socket *sock, struct
> msghdr *m, size_t dsz)
>   * Returns the number of bytes sent on success (or partial success),
>   * or errno if no data sent
>   */
> -static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t 
> dsz)
> +static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
>  {
>       struct sock *sk = sock->sk;
>       int ret;
> 
>       lock_sock(sk);
> -     ret = __tipc_send_stream(sock, m, dsz);
> +     ret = __tipc_sendstream(sock, m, dsz);
>       release_sock(sk);
> 
>       return ret;
>  }
> 
> -static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t 
> dsz)
> +static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t 
> dlen)
>  {
>       struct sock *sk = sock->sk;
> -     struct net *net = sock_net(sk);
> -     struct tipc_sock *tsk = tipc_sk(sk);
> -     struct tipc_msg *mhdr = &tsk->phdr;
> -     struct sk_buff_head pktchain;
>       DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
> -     u32 portid = tsk->portid;
> -     int rc = -EINVAL;
> -     long timeo;
> -     u32 dnode;
> -     uint mtu, send, sent = 0;
> -     struct iov_iter save;
> -     int hlen = MIN_H_SIZE;
> -
> -     /* Handle implied connection establishment */
> -     if (unlikely(dest)) {
> -             rc = __tipc_sendmsg(sock, m, dsz);
> -             hlen = msg_hdr_sz(mhdr);
> -             if (dsz && (dsz == rc))
> -                     tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
> -             return rc;
> -     }
> -     if (dsz > (uint)INT_MAX)
> -             return -EMSGSIZE;
> -
> -     if (unlikely(!tipc_sk_connected(sk))) {
> -             if (sk->sk_state == TIPC_DISCONNECTING)
> -                     return -EPIPE;
> -             else
> -                     return -ENOTCONN;
> -     }
> +     long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> +     struct tipc_sock *tsk = tipc_sk(sk);
> +     struct tipc_msg *hdr = &tsk->phdr;
> +     struct net *net = sock_net(sk);
> +     struct sk_buff_head pkts;
> +     u32 dnode = tsk_peer_node(tsk);
> +     int send, sent = 0;
> +     int rc = 0;
> 
> -     timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
> -     if (!timeo && tsk->link_cong)
> -             return -ELINKCONG;
> +     skb_queue_head_init(&pkts);
> 
> -     dnode = tsk_peer_node(tsk);
> -     skb_queue_head_init(&pktchain);
> +     if (unlikely(dlen > INT_MAX))
> +             return -EMSGSIZE;
> 
> -next:
> -     save = m->msg_iter;
> -     mtu = tsk->max_pkt;
> -     send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
> -     rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
> -     if (unlikely(rc < 0))
> +     /* Handle implicit connection setup */
> +     if (unlikely(dest)) {
> +             rc = __tipc_sendmsg(sock, m, dlen);
> +             if (dlen && (dlen == rc))
> +                     tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
>               return rc;
> +     }
> 
>       do {
> -             if (likely(!tsk_conn_cong(tsk))) {
> -                     rc = tipc_node_xmit(net, &pktchain, dnode, portid);
> -                     if (likely(!rc)) {
> -                             tsk->snt_unacked += tsk_inc(tsk, send + hlen);
> -                             sent += send;
> -                             if (sent == dsz)
> -                                     return dsz;
> -                             goto next;
> -                     }
> -                     if (rc == -EMSGSIZE) {
> -                             __skb_queue_purge(&pktchain);
> -                             tsk->max_pkt = tipc_node_get_mtu(net, dnode,
> -                                                              portid);
> -                             m->msg_iter = save;
> -                             goto next;
> -                     }
> -                     if (rc != -ELINKCONG)
> -                             break;
> -
> -                     tsk->link_cong = 1;
> -             }
> -             rc = tipc_wait_for_cond(sock, &timeo,
> -                                     (!tsk->link_cong &&
> +             rc = tipc_wait_for_cond(sock, &timeout,
> +                                     (!tsk->cong_link_cnt &&
>                                        !tsk_conn_cong(tsk) &&
>                                        tipc_sk_connected(sk)));
> -     } while (!rc);
> +             if (unlikely(rc))
> +                     break;
> +
> +             send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
> +             rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
> +             if (unlikely(rc != send))
> +                     break;
> +
> +             rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
> +             if (unlikely(rc == -ELINKCONG)) {
> +                     tsk->cong_link_cnt = 1;
> +                     rc = 0;
> +             }
> +             if (likely(!rc)) {
> +                     tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
> +                     sent += send;
> +             }
> +     } while (sent < dlen && !rc);
> 
> -     __skb_queue_purge(&pktchain);
> -     return sent ? sent : rc;
> +     return rc ? rc : sent;
>  }
> 
>  /**
> @@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct
> msghdr *m, size_t dsz)
>       if (dsz > TIPC_MAX_USER_MSG_SIZE)
>               return -EMSGSIZE;
> 
> -     return tipc_send_stream(sock, m, dsz);
> +     return tipc_sendstream(sock, m, dsz);
>  }
> 
>  /* tipc_sk_finish_conn - complete the setup of a connection
> @@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff
> *skb,
>       unsigned int limit = rcvbuf_limit(sk, skb);
>       int err = TIPC_OK;
>       int usr = msg_user(hdr);
> +     u32 onode;
> 
>       if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
>               tipc_sk_proto_rcv(tsk, skb, xmitq);
> @@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff
> *skb,
>       }
> 
>       if (unlikely(usr == SOCK_WAKEUP)) {
> +             onode = msg_orignode(hdr);
>               kfree_skb(skb);
> -             tsk->link_cong = 0;
> +             u32_del(&tsk->cong_links, onode);
> +             tsk->cong_link_cnt--;
>               sk->sk_write_space(sk);
>               return false;
>       }
> @@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct 
> socket
> *new_sock, int flags)
>               struct msghdr m = {NULL,};
> 
>               tsk_advance_rx_queue(sk);
> -             __tipc_send_stream(new_sock, &m, 0);
> +             __tipc_sendstream(new_sock, &m, 0);
>       } else {
>               __skb_dequeue(&sk->sk_receive_queue);
>               __skb_queue_head(&new_sk->sk_receive_queue, buf);
> @@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = {
>       .shutdown       = tipc_shutdown,
>       .setsockopt     = tipc_setsockopt,
>       .getsockopt     = tipc_getsockopt,
> -     .sendmsg        = tipc_send_stream,
> +     .sendmsg        = tipc_sendstream,
>       .recvmsg        = tipc_recv_stream,
>       .mmap           = sock_no_mmap,
>       .sendpage       = sock_no_sendpage
> --
> 2.7.4


------------------------------------------------------------------------------
Developer Access Program for Intel Xeon Phi Processors
Access to Intel Xeon Phi processor-based developer platforms.
With one year of Intel Parallel Studio XE.
Training and support from Colfax.
Order your platform today.http://sdm.link/intel
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to