This commit is an optimization that builds on top of commit 01883eda72bd
("rds: support for zcopy completion notification") for PF_RDS sockets.

Cookies associated with zerocopy completion are passed up on the POLLIN
channel, piggybacked with data whereever possible. Such cookies are passed
up as ancillary data (at level SOL_RDS) in a struct rds_zcopy_cookies when
the returned value of recvmsg() is >= 0. A max of SO_EE_ORIGIN_MAX_ZCOOKIES
may be passed with each message.

Signed-off-by: Sowmini Varadhan <sowmini.varad...@oracle.com>
---
 include/uapi/linux/rds.h |    8 +++++++
 net/rds/recv.c           |   47 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+), 0 deletions(-)

diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h
index 12e3bca..e733c01 100644
--- a/include/uapi/linux/rds.h
+++ b/include/uapi/linux/rds.h
@@ -37,6 +37,8 @@
 
 #include <linux/types.h>
 #include <linux/socket.h>              /* For __kernel_sockaddr_storage. */
+#include <linux/time.h>
+#include <linux/errqueue.h>
 
 #define RDS_IB_ABI_VERSION             0x301
 
@@ -104,6 +106,7 @@
 #define RDS_CMSG_MASKED_ATOMIC_CSWP    9
 #define RDS_CMSG_RXPATH_LATENCY                11
 #define        RDS_CMSG_ZCOPY_COOKIE           12
+#define        RDS_CMSG_ZCOPY_COMPLETION       13
 
 #define RDS_INFO_FIRST                 10000
 #define RDS_INFO_COUNTERS              10000
@@ -317,6 +320,11 @@ struct rds_rdma_notify {
 #define RDS_RDMA_DROPPED       3
 #define RDS_RDMA_OTHER_ERROR   4
 
+struct rds_zcopy_cookies {
+       __u32 num;
+       __u32 cookies[SO_EE_ORIGIN_MAX_ZCOOKIES];
+};
+
 /*
  * Common set of flags for all RDMA related structs
  */
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b080961..44da829 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -577,6 +577,43 @@ static int rds_cmsg_recv(struct rds_incoming *inc, struct 
msghdr *msg,
        return ret;
 }
 
+static int rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
+{
+       struct sk_buff *skb, *tmp;
+       struct sock_exterr_skb *serr;
+       struct sock *sk = rds_rs_to_sk(rs);
+       struct sk_buff_head *q = &sk->sk_error_queue;
+       struct rds_zcopy_cookies done;
+       u32 *ptr;
+       int i;
+       unsigned long flags;
+
+       spin_lock_irqsave(&q->lock, flags);
+       if (skb_queue_empty(q)) {
+               spin_unlock_irqrestore(&q->lock, flags);
+               return 0;
+       }
+       skb_queue_walk_safe(q, skb, tmp) {
+               serr = SKB_EXT_ERR(skb);
+               if (serr->ee.ee_origin == SO_EE_ORIGIN_ZCOOKIE) {
+                       __skb_unlink(skb, q);
+                       break;
+               }
+       }
+       spin_unlock_irqrestore(&q->lock, flags);
+
+       if (!skb)
+               return 0;
+       memset(&done, 0, sizeof(done));
+       done.num = serr->ee.ee_data;
+       ptr = (u32 *)skb->data;
+       for (i = 0; i < done.num; i++)
+               done.cookies[i] = *ptr++;
+       put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(done), &done);
+       consume_skb(skb);
+       return done.num;
+}
+
 int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
                int msg_flags)
 {
@@ -586,6 +623,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, 
size_t size,
        int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
        DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
        struct rds_incoming *inc = NULL;
+       int ncookies;
 
        /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
        timeo = sock_rcvtimeo(sk, nonblock);
@@ -609,6 +647,14 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, 
size_t size,
                        break;
                }
 
+               if (list_empty(&rs->rs_recv_queue) && nonblock) {
+                       ncookies = rds_recvmsg_zcookie(rs, msg);
+                       if (ncookies) {
+                               ret = 0;
+                               break;
+                       }
+               }
+
                if (!rds_next_incoming(rs, &inc)) {
                        if (nonblock) {
                                ret = -EAGAIN;
@@ -656,6 +702,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, 
size_t size,
                        msg->msg_flags |= MSG_TRUNC;
                }
 
+               ncookies = rds_recvmsg_zcookie(rs, msg);
                if (rds_cmsg_recv(inc, msg, rs)) {
                        ret = -EFAULT;
                        goto out;
-- 
1.7.1

Reply via email to