It is currently impossible for a client to send a connection setup
message with data, directly followed by one or more other data messages.

A way to fix this is to note that the we are sending a
service addressed message from a CONNECTING SOCK_SEQPACKET/SOCK_STREAM
socket, and define a new FOLLOW_UP (e.g., the unused bit 18 in word 0)
in the message header, and when the accepting socket sees this bit set,
it looks if there is already a spawned socket with that peer.
If so, it sends the message to that socket, instead of creating a new one.

Signed-off-by: Hoang Le <hoang.h...@dektech.com.au>
---
 net/tipc/msg.h    | 18 ++++++++++++
 net/tipc/node.c   | 26 +++++++++++++++++
 net/tipc/node.h   |  1 +
 net/tipc/socket.c | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++----
 4 files changed, 124 insertions(+), 5 deletions(-)

diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index a4e944d59394..0ca56c44f4b4 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -246,6 +246,19 @@ static inline void msg_set_src_droppable(struct tipc_msg 
*m, u32 d)
        msg_set_bits(m, 0, 18, 1, d);
 }
 
+/* Use same bit 18 to indicate data messages come along with
+ * connection setup message
+ */
+static inline int msg_follow_up(struct tipc_msg *m)
+{
+       return msg_bits(m, 0, 18, 1);
+}
+
+static inline void msg_set_follow_up(struct tipc_msg *m, u32 d)
+{
+       msg_set_bits(m, 0, 18, 1, d);
+}
+
 static inline void msg_set_size(struct tipc_msg *m, u32 sz)
 {
        m->hdr[0] = htonl((msg_word(m, 0) & ~0x1ffff) | sz);
@@ -564,6 +577,11 @@ static inline void msg_set_nameupper(struct tipc_msg *m, 
u32 n)
 #define GRP_REMIT_MSG        5
 
 /*
+ * Message follow with connection setup message
+ */
+#define FOLLOW_UP_MSG       1
+
+/*
  * Word 1
  */
 static inline u32 msg_seq_gap(struct tipc_msg *m)
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6a44eb812baf..4ce059194dd2 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -521,6 +521,32 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 
port, u32 peer_port)
        return err;
 }
 
+/* Lookup spawned socket that connecting with service addressed */
+u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 peer_port)
+{
+       struct tipc_node *node;
+       struct tipc_sock_conn *conn, *safe;
+       u32 portid = 0;
+
+       if (in_own_node(net, dnode))
+               return 0;
+
+       node = tipc_node_find(net, dnode);
+       if (!node)
+               return 0;
+
+       tipc_node_read_lock(node);
+       list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
+               if (peer_port == conn->peer_port) {
+                       portid = conn->port;
+                       break;
+               }
+       }
+       tipc_node_read_unlock(node);
+       tipc_node_put(node);
+       return portid;
+}
+
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 {
        struct tipc_node *node;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 846c8f240872..20d1699f4b94 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -81,6 +81,7 @@ void tipc_node_unsubscribe(struct net *net, struct list_head 
*subscr, u32 addr);
 void tipc_node_broadcast(struct net *net, struct sk_buff *skb);
 int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
 void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
+u32 tipc_node_lkup_conn(struct net *net, u32 dnode, u32 port);
 int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel);
 bool tipc_node_is_up(struct net *net, u32 addr);
 u16 tipc_node_get_capabilities(struct net *net, u32 addr);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 14a5d055717d..552e435ffa06 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -138,6 +138,9 @@ static void tipc_sk_remove(struct tipc_sock *tsk);
 static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t 
dsz);
 static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 
+/* Forward to CONNECTING socket if it is already being spawned */
+static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb);
+
 static const struct proto_ops packet_ops;
 static const struct proto_ops stream_ops;
 static const struct proto_ops msg_ops;
@@ -1310,7 +1313,10 @@ static int __tipc_sendmsg(struct socket *sock, struct 
msghdr *m, size_t dlen)
                if (sk->sk_state == TIPC_LISTEN)
                        return -EPIPE;
                if (sk->sk_state != TIPC_OPEN)
-                       return -EISCONN;
+                       if (sk->sk_state != TIPC_CONNECTING ||
+                           (msg_follow_up(hdr) != FOLLOW_UP_MSG))
+                               return -EISCONN;
+
                if (tsk->published)
                        return -EOPNOTSUPP;
                if (dest->addrtype == TIPC_ADDR_NAME) {
@@ -1416,6 +1422,7 @@ static int __tipc_sendstream(struct socket *sock, struct 
msghdr *m, size_t dlen)
 
        /* Handle implicit connection setup */
        if (unlikely(dest)) {
+               msg_set_follow_up(hdr, FOLLOW_UP_MSG);
                rc = __tipc_sendmsg(sock, m, dlen);
                if (dlen && (dlen == rc))
                        tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
@@ -2079,6 +2086,37 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct 
sk_buff *skb)
        return FLOWCTL_MSG_LIM;
 }
 
+static bool __tipc_sk_spawned_enqueue(struct sock *sk, struct sk_buff *skb)
+{
+       struct tipc_msg *hdr = buf_msg(skb);
+       struct net *net = sock_net(sk);
+       struct tipc_sock *tsk;
+       struct sock *spawned_sk;
+       u32 portid = 0;
+
+       if (sk->sk_state == TIPC_LISTEN &&
+           (msg_follow_up(hdr) == FOLLOW_UP_MSG)) {
+               // Lookup spawned socket in net
+               portid = tipc_node_lkup_conn(net, msg_orignode(hdr),
+                                            msg_origport(hdr));
+               if (portid) {
+                       tsk = tipc_sk_lookup(net, portid);
+                       if (likely(tsk)) {
+                               spawned_sk = &tsk->sk;
+                               if 
(likely(spin_trylock_bh(&spawned_sk->sk_lock.slock))) {
+                                       
__skb_queue_tail(&spawned_sk->sk_receive_queue, skb);
+                                       skb_set_owner_r(skb, spawned_sk);
+                                       spawned_sk->sk_data_ready(spawned_sk);
+                                       
spin_unlock_bh(&spawned_sk->sk_lock.slock);
+                               }
+                               sock_put(spawned_sk);
+                               return true;
+                       }
+               }
+       }
+       return false;
+}
+
 /**
  * tipc_sk_filter_rcv - validate incoming message
  * @sk: socket
@@ -2114,6 +2152,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
        /* Validate and add to receive buffer if there is space */
        while ((skb = __skb_dequeue(&inputq))) {
                hdr = buf_msg(skb);
+
                limit = rcvbuf_limit(sk, skb);
                if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
                    (!sk_conn && msg_connected(hdr)) ||
@@ -2129,9 +2168,14 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct 
sk_buff *skb,
                        err = TIPC_OK;
                        continue;
                }
-               __skb_queue_tail(&sk->sk_receive_queue, skb);
-               skb_set_owner_r(skb, sk);
-               sk->sk_data_ready(sk);
+
+               /* Redirect to spawned socket (CONNECTING/ESTABLISHED)
+                * if this skb is sent together with connection setup message. 
*/
+               if (!__tipc_sk_spawned_enqueue(sk, skb)) {
+                       __skb_queue_tail(&sk->sk_receive_queue, skb);
+                       skb_set_owner_r(skb, sk);
+                       sk->sk_data_ready(sk);
+               }
        }
 }
 
@@ -2443,6 +2487,9 @@ static int tipc_accept(struct socket *sock, struct socket 
*new_sock, int flags,
        struct sk_buff *buf;
        struct tipc_sock *new_tsock;
        struct tipc_msg *msg;
+       u32 peer_port = 0;
+       u32 peer_node = 0;
+       u32 portid = 0;
        long timeo;
        int res;
 
@@ -2458,15 +2505,24 @@ static int tipc_accept(struct socket *sock, struct 
socket *new_sock, int flags,
                goto exit;
 
        buf = skb_peek(&sk->sk_receive_queue);
+       msg = buf_msg(buf);
+
+       /*
+        * Forward to CONNECTING socket if it is already being spawned
+        */
+       if (__tipc_sk_spawned_enqueue(sk, buf)) {
+               __skb_dequeue(&sk->sk_receive_queue);
+               goto exit;
+       }
 
        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
        if (res)
                goto exit;
+
        security_sk_clone(sock->sk, new_sock->sk);
 
        new_sk = new_sock->sk;
        new_tsock = tipc_sk(new_sk);
-       msg = buf_msg(buf);
 
        /* we lock on new_sk; but lockdep sees the lock on sk */
        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
@@ -2499,7 +2555,25 @@ static int tipc_accept(struct socket *sock, struct 
socket *new_sock, int flags,
                __skb_dequeue(&sk->sk_receive_queue);
                __skb_queue_head(&new_sk->sk_receive_queue, buf);
                skb_set_owner_r(buf, new_sk);
+
+               /*
+                * If there are more other data messages follow with
+                * connection setup message, enqueue on new socket.
+                */
+               peer_port = tsk_peer_port(new_tsock);
+               peer_node = tsk_peer_node(new_tsock);
+               portid = msg_destport(msg);
+               while ((buf = tipc_skb_dequeue(&sk->sk_receive_queue, portid)) 
!= NULL) {
+                       if ((msg_orignode(buf_msg(buf)) == peer_node) &&
+                           (msg_origport(buf_msg(buf)) == peer_port)) {
+                               __skb_queue_tail(&new_sk->sk_receive_queue, 
buf);
+                               skb_set_owner_r(buf, new_sk);
+                       } else {
+                               __skb_queue_tail(&sk->sk_receive_queue, buf);
+                       }
+               }
        }
+
        release_sock(new_sk);
 exit:
        release_sock(sk);
-- 
2.7.4


------------------------------------------------------------------------------
Check out the vibrant tech community on one of the world's most
engaging tech sites, Slashdot.org! http://sdm.link/slashdot
_______________________________________________
tipc-discussion mailing list
tipc-discussion@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/tipc-discussion

Reply via email to