Pls review the following.
I wont have performance numbers for this code that till next week.

---

Add zero copy support to synchronous socket operations (send_msg/recv_msg).
This patch also includes a couple of fixes for aio, which I'll split and
commit separately.

Signed-off-by: Michael S. Tsirkin <[EMAIL PROTECTED]>

Index: drivers/infiniband/ulp/sdp/Kconfig
===================================================================
--- drivers/infiniband/ulp/sdp/Kconfig  (revision 3958)
+++ drivers/infiniband/ulp/sdp/Kconfig  (working copy)
@@ -8,6 +8,20 @@
           libsdp library from <http://openib.org> to have standard
           sockets applications use SDP.
 
+config INFINIBAND_SDP_SEND_ZCOPY
+       bool "Sockets Direct Protocol Zero Copy Send support"
+       depends on INFINIBAND_SDP
+       default y
+       ---help---
+         This option enables Zero Copy support for send_msg transactions.
+
+config INFINIBAND_SDP_RECV_ZCOPY
+       bool "Sockets Direct Protocol Zero Copy Receive support"
+       depends on INFINIBAND_SDP && INFINIBAND_SDP_SEND_ZCOPY
+       default y
+       ---help---
+         This option enables Zero Copy support for recv_msg transactions.
+
 config INFINIBAND_SDP_DEBUG
        bool "Sockets Direct Protocol debugging"
        depends on INFINIBAND_SDP
Index: drivers/infiniband/ulp/sdp/sdp_rcvd.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_rcvd.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_rcvd.c       (working copy)
@@ -439,6 +439,11 @@
 
                sdp_advt_destroy(advt);
        }
+
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       /* There are no more src_pend, wake any waiting thread */
+       sdp_iocb_wake(&conn->src_wait_list);
+#endif
        /*
         * If there are active reads, mark the connection as being in
         * source cancel. Otherwise
Index: drivers/infiniband/ulp/sdp/sdp_sock.h
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_sock.h       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_sock.h       (working copy)
@@ -61,7 +61,9 @@
 #define SDP_ZCOPY_THRSH_SRC  257 /* Threshold for AIO write advertisments */
 #define SDP_ZCOPY_THRSH_SNK  258 /* Threshold for AIO read advertisments */
 #define SDP_ZCOPY_THRSH      256 /* Convenience for read and write */
-
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+#define SDP_ZCOPY_CANCEL_TIMEOUT (HZ * 60) /* Time before abortive close */
+#endif
 /*
  * Default values for SDP specific socket options. (for reference)
  */
Index: drivers/infiniband/ulp/sdp/sdp_proto.h
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_proto.h      (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_proto.h      (working copy)
@@ -152,7 +152,10 @@
 void sdp_iocb_q_put_tail(struct sdpc_iocb_q *table, struct sdpc_iocb *iocb);
 
 struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
+struct sdpc_iocb *sdp_iocb_q_lookup_req(struct sdpc_iocb_q *table, struct 
kiocb *req);
 
+void sdp_iocb_q_mark_cancel(struct sdpc_iocb_q *table, struct kiocb *req);
+
 void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
 
 void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
@@ -197,6 +200,8 @@
                                                  void *arg),
                                    void *arg);
 
+int sdp_iocb_find_req(struct sdpc_desc *element, void *arg);
+
 int sdp_desc_q_types_size(struct sdpc_desc_q *table,
                          enum sdp_desc_type type);
 
Index: drivers/infiniband/ulp/sdp/sdp_read.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_read.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_read.c       (working copy)
@@ -93,6 +93,12 @@
                }
        }
 
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       /* If there are no more src_pend, wake any waiting thread */
+       if (!sdp_advt_q_size(&conn->src_pend))
+               sdp_iocb_wake(&conn->src_wait_list);
+
+#endif
 done:
        return 0;
 error:
Index: drivers/infiniband/ulp/sdp/sdp_send.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_send.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_send.c       (working copy)
@@ -122,6 +122,10 @@
                send_param.send_flags |= IB_SEND_SIGNALED;
                conn->send_cons = 0;
        }
+
+       if (buff->bsdh_hdr->mid == SDP_MID_SRC_CANCEL)
+               sdp_dbg_ctrl(conn, "SRC_CANCEL bsdh_hdr->seq_num = %d 
conn->send_seq=%d\n",
+                            buff->bsdh_hdr->seq_num, conn->send_seq);
        /*
         * post send
         */
@@ -1680,8 +1684,8 @@
 static int sdp_inet_write_cancel(struct kiocb *req, struct io_event *ev)
 {
        struct sock_iocb *si = kiocb_to_siocb(req);
-       struct sdp_sock   *conn;
        struct sdpc_iocb *iocb;
+       struct sdp_sock *conn;
        int result = 0;
 
        sdp_dbg_ctrl(NULL, "Cancel Write IOCB user <%d> key <%d> flag <%08lx>",
@@ -1738,7 +1742,7 @@
                                /*
                                 * completion reference
                                 */
-                               aio_put_req(req);
+                               aio_put_req(iocb->req);
 
                                result = 0;
                        }
@@ -1797,9 +1801,8 @@
         * no IOCB found. The cancel is probably in a race with a completion.
         * Assume the IOCB will be completed, return appropriate value.
         */
-       sdp_warn("Cancel write with no IOCB. <%d:%d:%08lx>",
-                req->ki_users, req->ki_key, req->ki_flags);
-
+       sdp_dbg_warn(conn, "Cancel write with no IOCB. <%d:%d:%08lx>",
+                    req->ki_users, req->ki_key, req->ki_flags);
        result = -EAGAIN;
 
 unlock:
@@ -1810,7 +1813,151 @@
        return result;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+static int sdp_write_src_cancel(struct sdpc_desc *element, void *arg)
+{
+       struct sdpc_iocb *iocb = (struct sdpc_iocb *) element;
+       struct kiocb *req = (struct kiocb *)arg;
+
+       if (element->type == SDP_DESC_TYPE_IOCB && iocb->req == req)
+               iocb->flags |= SDP_IOCB_F_CANCEL;
+       return -ERANGE;
+}
+
+static int sdp_req_busy(struct sdp_sock *conn, struct sdpc_iocb_wait *wait)
+{
+       unsigned long flags;
+       int result = -EAGAIN;
+
+       sdp_conn_lock(conn);
+       sdp_conn_unlock(conn);
+
+       spin_lock_irqsave(&wait->lock, flags);
+       if (!wait->outstanding)
+               result = 0;
+       spin_unlock_irqrestore(&wait->lock, flags);
+       return result;
+}
 /*
+ * sdp_write_cancel - cancel a synchronous IO operation
+ */
+static int sdp_write_cancel(struct kiocb *req, struct sdp_sock *conn,
+                           struct sdpc_iocb_wait *wait)
+{
+       struct sdpc_iocb *iocb;
+       int result = 0;
+
+       sdp_dbg_ctrl(NULL, "Cancel Write IOCB user <%d> key <%d> flag <%08lx>",
+                    req->ki_users, req->ki_key, req->ki_flags);
+
+       sdp_conn_lock(conn);
+
+       sdp_dbg_ctrl(conn, "Cancel Write IOCB. <%08x:%04x> <%08x:%04x>",
+                    conn->src_addr, conn->src_port,
+                    conn->dst_addr, conn->dst_port);
+       /*
+        * attempt to find the IOCB for this key. we don't have an indication
+        * whether this is a read or write.
+        */
+
+       while ((iocb = (struct sdpc_iocb *)
+              sdp_desc_q_lookup(&conn->send_queue, sdp_iocb_find_req, req))) {
+               iocb->flags |= SDP_IOCB_F_CANCEL;
+
+               /*
+                * always remove the IOCB.
+                * If active, then place it into the correct active queue
+                */
+               sdp_desc_q_remove((struct sdpc_desc *)iocb);
+
+               if (iocb->flags & SDP_IOCB_F_ACTIVE) {
+                       if (iocb->flags & SDP_IOCB_F_RDMA_W)
+                               sdp_desc_q_put_tail(&conn->w_snk,
+                                                   (struct sdpc_desc *)iocb);
+                       else {
+                               SDP_EXPECT((iocb->flags & SDP_IOCB_F_RDMA_R));
+
+                               sdp_iocb_q_put_tail(&conn->w_src, iocb);
+                       }
+               } else {
+                       /*
+                        * empty IOCBs can be deleted, while partials
+                        * needs to be compelted.
+                        */
+                       if (iocb->post > 0) {
+                               sdp_iocb_complete(iocb, 0);
+                               result = -EAGAIN;
+                       } else {
+                               sdp_iocb_destroy(iocb);
+
+                               /*
+                                * completion reference
+                                */
+                               if (!iocb->wait)
+                                       aio_put_req(iocb->req);
+                               else {
+                                       unsigned long flags;
+                                       spin_lock_irqsave(&iocb->wait->lock, 
flags);
+                                       --iocb->wait->outstanding;
+                                       /* No need to wake up,
+                                          since we call sdp_req_busy
+                                          directly below */
+
+                                       
spin_unlock_irqrestore(&iocb->wait->lock, flags);
+                               }
+                       }
+               }
+       }
+
+       /*
+        * check the sink queue, not much to do, since the operation is
+        * already in flight.
+        */
+       sdp_desc_q_lookup(&conn->w_snk, sdp_write_src_cancel, req);
+
+       iocb = (struct sdpc_iocb *)sdp_desc_q_lookup(&conn->w_snk,
+                                                    sdp_iocb_find_req,
+                                                    req);
+       if (iocb) {
+               sdp_dbg_ctrl(conn, "Sink Queue busy\n");
+               result = -EAGAIN;
+       }
+
+       /*
+        * check source queue. If we're in the source queue, then a cancel
+        * needs to be issued.
+        */
+       sdp_iocb_q_mark_cancel(&conn->w_src, req);
+
+       iocb = sdp_iocb_q_lookup_req(&conn->w_src, req);
+       if (iocb) {
+               sdp_dbg_ctrl(conn, "Sending Src Cancel\n");
+
+               if (! (conn->flags & SDP_CONN_F_SRC_CANCEL_L)) {
+                       sdp_desc_q_lookup(&conn->w_snk, sdp_write_src_cancel, 
req);
+                       conn->flags |= SDP_CONN_F_SRC_CANCEL_L;
+                       result = sdp_send_ctrl_src_cancel(conn);
+                       SDP_EXPECT(result >= 0);
+               }
+
+               result = -EAGAIN;
+       }
+
+       if (!result) {
+               /*
+                * no IOCB found. Assume the IOCB will be completed.
+                */
+               sdp_dbg_ctrl(conn, "Cancel IOCB done. <%d:%d:%08lx>",
+                            req->ki_users, req->ki_key, req->ki_flags);
+       }
+
+       sdp_conn_unlock(conn);
+
+       return sdp_req_busy(conn, wait);
+}
+#endif
+
+/*
  * sdp_send_flush_advt - Flush passive sink advertisments
  */
 static int sdp_send_flush_advt(struct sdp_sock *conn)
@@ -1987,7 +2134,7 @@
        return timeout;
 }
 
-static inline int sdp_queue_iocb(struct kiocb *req, struct sdp_sock *conn,
+static inline int sdp_queue_aio(struct kiocb *req, struct sdp_sock *conn,
                                 struct msghdr *msg, size_t size,
                                 size_t *copied)
 {
@@ -2038,14 +2185,79 @@
        return -EIOCBQUEUED;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+static inline int sdp_queue_sync(struct kiocb *req, struct sdp_sock *conn,
+                                struct msghdr *msg, size_t size,
+                                size_t *copied,
+                                struct sdpc_iocb_wait *wait)
+{
+       struct sdpc_iocb *iocb;
+       struct iovec *msg_iov;
+       unsigned long flags;
+       size_t len;
+       int result;
+       /*
+        * create IOCB with remaining space
+        */
+       iocb = sdp_iocb_create();
+       if (!iocb) {
+               sdp_dbg_warn(conn, "Failed to allocate IOCB <%Zu:%ld>",
+                            size, (long)*copied);
+               return -ENOMEM;
+       }
+
+       for (msg_iov = msg->msg_iov; !msg->msg_iov->iov_len; ++msg_iov);
+
+       /* FMR alignment can add an extra page. */
+       len = min(msg_iov->iov_len, (size_t)SDP_IOCB_SIZE_MAX - 4096);
+       iocb->len  = len;
+       iocb->post = 0;
+       iocb->size = len;
+       iocb->req  = req;
+       iocb->key  = req->ki_key;
+       iocb->addr = (unsigned long)msg_iov->iov_base;
+       iocb->wait = wait;
+
+       result = sdp_iocb_lock(iocb);
+       if (result < 0) {
+               sdp_dbg_warn(conn, "Error <%d> locking IOCB <%Zu:%ld>",
+                            result, size, (long)copied);
+
+               sdp_iocb_destroy(iocb);
+               return result;
+       }
+
+       SDP_CONN_STAT_WQ_INC(conn, iocb->size);
+
+       result = sdp_send_data_queue(conn, (struct sdpc_desc *)iocb);
+       if (result < 0) {
+               sdp_dbg_warn(conn, "Error <%d> queueing write IOCB", result);
+               sdp_iocb_destroy(iocb);
+               return result;
+       }
+
+       spin_lock_irqsave(&wait->lock, flags);
+       ++wait->outstanding;
+       spin_unlock_irqrestore(&wait->lock, flags);
+
+       conn->send_pipe += len;
+       *copied += len; /* copied amount was saved in IOCB. */
+       msg_iov->iov_len -= len;
+       msg_iov->iov_base += len;
+       return 0;
+}
+#endif
 /*
  * sdp_inet_send - send data from user space to the network
  */
 int sdp_inet_send(struct kiocb *req, struct socket *sock, struct msghdr *msg,
                  size_t size)
 {
-       struct sock      *sk;
-       struct sdp_sock   *conn;
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+       struct sdpc_iocb_wait wait;
+#endif
+       struct sock *sk;
+       struct sdp_sock *conn;
        int result = 0;
        size_t copied = 0;
        int oob, zcopy;
@@ -2074,6 +2286,7 @@
        if (conn->state == SDP_CONN_ST_LISTEN ||
            conn->state == SDP_CONN_ST_CLOSED) {
                result = -ENOTCONN;
+               sdp_conn_unlock(conn);
                goto done;
        }
        /*
@@ -2082,13 +2295,24 @@
         * they are smaller then the zopy threshold, but only if there is
         * no buffer write space.
         */
-       zcopy = (size >= conn->src_zthresh && !is_sync_kiocb(req));
+       zcopy = (size >= conn->src_zthresh && (!is_sync_kiocb(req)
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+                || (!(msg->msg_flags & MSG_DONTWAIT) && !oob)
+#endif
+               ));
 
        /*
         * clear ASYN space bit, it'll be reset if there is no space.
         */
        if (!zcopy)
                clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+       else if (is_sync_kiocb(req)) {
+               init_waitqueue_head(&wait.wait);
+               spin_lock_init(&wait.lock);
+               wait.outstanding = 0;
+       }
+#endif
        /*
         * process data first if window is open, next check conditions, then
         * wait if there is more work to be done. The absolute window size is
@@ -2143,14 +2367,45 @@
                 * completion. Wait on sync IO call create IOCB for async
                 * call.
                 */
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+               if (is_sync_kiocb(req) && zcopy)
+                       result = sdp_queue_sync(req, conn, msg, size, &copied,
+                                               &wait);
+                       /* TODO: limit the # of outstanding reqs */
+                       /* TODO: sleep on recoverable errors */
+               else
+#endif
                if (is_sync_kiocb(req))
                        timeout = sdp_wait_till_space(sk, conn, oob, timeout);
                else
-                       result = sdp_queue_iocb(req, conn, msg, size, &copied);
+                       result = sdp_queue_aio(req, conn, msg, size, &copied);
        }
 
+       sdp_conn_unlock(conn);
+
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+       if (!result && is_sync_kiocb(req) && zcopy) {
+               timeout = wait_event_interruptible_timeout(wait.wait,
+                                          !sdp_req_busy(conn, &wait), timeout);
+               if (!timeout)
+                       result = -EAGAIN;
+       }
+
+       if (signal_pending(current) && is_sync_kiocb(req) && zcopy) {
+               result = (timeout > 0) ? sock_intr_errno(timeout) : -EAGAIN;
+
+               timeout = wait_event_timeout(wait.wait,
+                               !sdp_write_cancel(req, conn, &wait),
+                               SDP_ZCOPY_CANCEL_TIMEOUT);
+               if (!timeout) {
+                       sdp_warn("sdp_write_cancel timed out. Abort.\n");
+                       sdp_conn_lock(conn);
+                       sdp_conn_abort(conn);
+                       sdp_conn_unlock(conn);
+               }
+       }
+#endif
 done:
-       sdp_conn_unlock(conn);
        result = ((copied > 0) ? copied : result);
 
        if (result == -EPIPE && !(msg->msg_flags & MSG_NOSIGNAL))
Index: drivers/infiniband/ulp/sdp/sdp_conn.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_conn.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_conn.c       (working copy)
@@ -1279,7 +1279,15 @@
         * connection lock
         */
        sdp_conn_lock_init(conn);
+
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
        /*
+       * Tasks to wake up when we finish all src avail
+       */
+       INIT_LIST_HEAD(&conn->src_wait_list);
+
+#endif
+       /*
         * insert connection into lookup table
         */
        result = sdp_conn_table_insert(conn);
Index: drivers/infiniband/ulp/sdp/sdp_recv.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_recv.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_recv.c       (working copy)
@@ -327,6 +327,10 @@
        iocb = sdp_iocb_q_look(&conn->r_pend);
        if (!iocb)
                return ENODEV;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       if (iocb->flags & SDP_IOCB_F_WAITALL)
+               return ENODEV;
+#endif
        /*
         * check zcopy threshold
         */
@@ -708,6 +712,9 @@
         */
        if (!iocb->len ||
            (!conn->src_recv &&
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+            !(iocb->flags & SDP_IOCB_F_WAITALL) &&
+#endif
             !(sk_sdp(conn)->sk_rcvlowat > iocb->post))) {
                /*
                 * complete IOCB
@@ -1055,7 +1062,178 @@
        return result;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
 /*
+ * sdp_read_cancel - cancel a synchronous IO operation
+ */
+static int sdp_read_cancel(struct kiocb *req, struct sdp_sock *conn,
+                          struct sdpc_iocb_wait *wait, size_t *copied)
+{
+       struct sdpc_iocb *iocb;
+       int result = 0;
+
+       sdp_dbg_ctrl(NULL, "Cancel Read IOCB. user <%d> key <%d> flag <%08lx>",
+                    req->ki_users, req->ki_key, req->ki_flags);
+
+       sdp_dbg_ctrl(conn, "Cancel Read IOCB. <%08x:%04x> <%08x:%04x>",
+                    conn->src_addr, conn->src_port,
+                    conn->dst_addr, conn->dst_port);
+       /*
+        * attempt to find the IOCB for this key. we don't have an indication
+        * whether this is a read or write.
+        */
+       while ((iocb = sdp_iocb_q_lookup_req(&conn->r_pend, req))) {
+               /*
+                * always remove the IOCB. If active, then place it into
+                * the correct active queue. Inactive empty IOCBs can be
+                * deleted, while inactive partials needs to be compelted.
+                */
+               sdp_iocb_q_remove(iocb);
+
+               if (!(iocb->flags & SDP_IOCB_F_ACTIVE)) {
+                       *copied -= iocb->len;
+                       if (iocb->post > 0) {
+                               /*
+                                * callback to complete IOCB, or drop reference
+                                */
+                               sdp_iocb_complete(iocb, 0);
+                               result = -EAGAIN;
+                       }
+                       else {
+                               sdp_iocb_destroy(iocb);
+                               /*
+                                * completion reference
+                                */
+                               if (iocb->wait) {
+                                       unsigned long flags;
+                                       spin_lock_irqsave(&iocb->wait->lock, 
flags);
+                                       if (!--iocb->wait->outstanding) {
+                                               wake_up(&iocb->wait->wait);
+                                       }
+                                       
spin_unlock_irqrestore(&iocb->wait->lock, flags);
+                               } else
+                                       aio_put_req(req);
+
+                               result = 0;
+                       }
+
+                       goto out;
+               }
+
+               if (iocb->flags & SDP_IOCB_F_RDMA_W)
+                       sdp_iocb_q_put_tail(&conn->r_snk, iocb);
+               else {
+                       SDP_EXPECT((iocb->flags & SDP_IOCB_F_RDMA_R));
+
+                       sdp_desc_q_put_tail(&conn->r_src,
+                                           (struct sdpc_desc *)iocb);
+               }
+       }
+       /*
+        * check the source queue, not much to do, since the operation is
+        * already in flight.
+        */
+       iocb = (struct sdpc_iocb *)sdp_desc_q_lookup(&conn->r_src,
+                                                    sdp_iocb_find_req, req);
+       if (iocb) {
+               iocb->flags |= SDP_IOCB_F_CANCEL;
+               result = -EAGAIN;
+
+               goto out;
+       }
+       /*
+        * check sink queue. If we're in the sink queue, then a cancel
+        * needs to be issued.
+        */
+       iocb = sdp_iocb_q_lookup_req(&conn->r_snk, req);
+       if (iocb) {
+               /*
+                * Unfortunetly there is only a course grain cancel in SDP, so
+                * we have to cancel everything.
+                */
+               if (!(conn->flags & SDP_CONN_F_SNK_CANCEL)) {
+
+                       result = sdp_send_ctrl_snk_cancel(conn);
+                       SDP_EXPECT(result >= 0);
+
+                       conn->flags |= SDP_CONN_F_SNK_CANCEL;
+               }
+
+               iocb->flags |= SDP_IOCB_F_CANCEL;
+               result = -EAGAIN;
+
+               goto out;
+       }
+       /*
+        * no IOCB found. The cancel is probably in a race with a completion.
+        * Assume the IOCB will be completed, return appropriate value.
+        */
+       sdp_dbg_ctrl(NULL, "Cancel read with no IOCB. <%d:%d:%08lx>",
+                    req->ki_users, req->ki_key, req->ki_flags);
+
+       result = -EAGAIN;
+
+out:
+       return result;
+}
+
+static int sdp_req_busy(struct kiocb *req, struct sdp_sock *conn,
+                       struct sdpc_iocb_wait *wait, int waitall,
+                       size_t *copied)
+{
+       struct sdpc_iocb *iocb;
+       unsigned long flags;
+       int result = -EAGAIN;
+
+       for (;;) {
+               spin_lock_irqsave(&wait->lock, flags);
+               iocb = sdp_iocb_q_get_head(&wait->q);
+               if (!iocb)
+                       break;
+               --wait->outstanding;
+               spin_unlock_irqrestore(&wait->lock, flags);
+
+               sdp_iocb_release(iocb);
+               sdp_iocb_unlock(iocb);
+               sdp_iocb_destroy(iocb);
+       }
+
+       if (!wait->outstanding)
+               result = 0;
+
+       spin_unlock_irqrestore(&wait->lock, flags);
+
+       sdp_conn_lock(conn);
+
+       /* If WAITALL is clear, and there are no more src_pend,
+          remove all pending iocbs */
+       if (!waitall && !sdp_advt_q_size(&conn->src_pend)) {
+               sdp_read_cancel(req, conn, wait, copied);
+               result = 0;
+       }
+
+       if (!result)
+               list_del_init(&wait->src_wait_list);
+
+       sdp_conn_unlock(conn);
+
+       return result;
+}
+/*
+ * sdp_inet_read_cancel - cancel an IO operation
+ */
+static int sdp_cancel_read(struct kiocb *req, struct sdp_sock *conn,
+                          struct sdpc_iocb_wait *wait, size_t *copied)
+{
+       sdp_conn_lock(conn);
+       sdp_read_cancel(req, conn, wait, copied);
+       sdp_conn_unlock(conn);
+
+       return sdp_req_busy(req, conn, wait, 1, copied);
+}
+#endif
+
+/*
  * sdp_inet_recv - recv data from the network to user space
  */
 int sdp_inet_recv(struct kiocb  *req, struct socket *sock, struct msghdr *msg,
@@ -1065,17 +1243,22 @@
        struct sdp_sock   *conn;
        struct sdpc_iocb *iocb;
        struct sdpc_buff *buff;
-       long   timeout;
+       long   timeout = 0 /*Turn off compiler warning */;
        size_t length;
        int result = 0;
        int expect;
        int low_water;
-       int copied = 0;
+       size_t copied = 0;
        int copy;
        int update;
        s8 oob = 0;
        s8 ack = 0;
        struct sdpc_buff_q peek_queue;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       int zcopy = 0;
+       struct sdpc_iocb_wait wait;
+       unsigned long f;
+#endif
 
        sk = sock->sk;
        conn = sdp_sk(sk);
@@ -1293,6 +1476,76 @@
                /*
                 * Either wait or create IOCB for defered completion.
                 */
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+               if (is_sync_kiocb(req) && !(flags & MSG_PEEK) &&
+                       (zcopy || size - copied >= conn->snk_zthresh) &&
+                        (conn->src_recv ||
+                                (low_water - copied >= conn->snk_zthresh))) {
+                       struct iovec *msg_iov;
+                       size_t len;
+                       /*
+                        * create IOCB with remaining space
+                        */
+                       iocb = sdp_iocb_create();
+                       if (!iocb) {
+                               sdp_dbg_warn(conn,
+                                            "Error allocating IOCB <%Zu:%Zd>",
+                                            size, copied);
+                               result = -ENOMEM;
+                               break;
+                       }
+
+                       for (msg_iov = msg->msg_iov; !msg->msg_iov->iov_len; 
++msg_iov);
+
+                       /* FMR alignment can add an extra page. */
+                       len = min(msg_iov->iov_len, (size_t)SDP_IOCB_SIZE_MAX - 
4096);
+                       iocb->len  = len;
+                       iocb->post = 0;
+                       iocb->size = len;
+                       iocb->req  = req;
+                       iocb->key  = req->ki_key;
+                       iocb->addr = (unsigned long)msg_iov->iov_base;
+                       iocb->wait = &wait;
+
+                       iocb->flags |= SDP_IOCB_F_RECV | SDP_IOCB_F_WAITALL;
+
+                       req->ki_cancel = sdp_inet_read_cancel;
+
+                       result = sdp_iocb_lock(iocb);
+                       if (result < 0) {
+                               sdp_dbg_warn(conn,
+                                            "Error <%d> IOCB lock <%Zu:%Zd>",
+                                            result, size, copied);
+
+                               sdp_iocb_destroy(iocb);
+                               break;
+                       }
+
+                       SDP_CONN_STAT_RQ_INC(conn, iocb->size);
+
+                       if (!zcopy) {
+                               init_waitqueue_head(&wait.wait);
+                               INIT_LIST_HEAD(&wait.src_wait_list);
+                               spin_lock_init(&wait.lock);
+                               sdp_iocb_q_init(&wait.q);
+                               wait.outstanding = 0;
+                               zcopy  = 1;
+                       }
+
+                       sdp_iocb_q_put_tail(&conn->r_pend, iocb);
+
+                       spin_lock_irqsave(&wait.lock, f);
+                       ++wait.outstanding;
+                       spin_unlock_irqrestore(&wait.lock, f);
+
+                       /* TODO: set it? */
+                       ack = 1;
+                       copied += len;
+                       msg_iov->iov_len -= len;
+                       msg_iov->iov_base += len;
+                       break;
+               } else
+#endif
                if (is_sync_kiocb(req)) {
                        DECLARE_WAITQUEUE(wait, current);
 
@@ -1325,7 +1578,7 @@
                        iocb = sdp_iocb_create();
                        if (!iocb) {
                                sdp_dbg_warn(conn,
-                                            "Error allocating IOCB <%Zu:%d>",
+                                            "Error allocating IOCB <%Zu:%Zd>",
                                             size, copied);
                                result = -ENOMEM;
                                break;
@@ -1346,7 +1599,7 @@
                        result = sdp_iocb_lock(iocb);
                        if (result < 0) {
                                sdp_dbg_warn(conn,
-                                            "Error <%d> IOCB lock <%Zu:%d>",
+                                            "Error <%d> IOCB lock <%Zu:%Zd>",
                                             result, size, copied);
 
                                sdp_iocb_destroy(iocb);
@@ -1382,6 +1635,43 @@
                while ((buff = sdp_buff_q_get_tail(&peek_queue)))
                        sdp_buff_q_put_head(&conn->recv_pool, buff);
 
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       /* If WAITALL is clear, wake up also when we run out of src avail */
+       if (!result && is_sync_kiocb(req) && zcopy && !(flags & MSG_WAITALL)) {
+               list_add_tail(&conn->src_wait_list, &wait.src_wait_list);
+       }
+#endif
        sdp_conn_unlock(conn);
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       if (!result && is_sync_kiocb(req) && zcopy) {
+               timeout = wait_event_interruptible_timeout(wait.wait,
+                          !sdp_req_busy(req, conn, &wait,
+                                        (flags & MSG_WAITALL), &copied),
+                                        timeout);
+               if (!timeout) {
+                       result = -EAGAIN;
+                       if (!(flags & MSG_WAITALL)) {
+                               sdp_conn_lock(conn);
+                               list_del_init(&wait.src_wait_list);
+                               sdp_conn_unlock(conn);
+                       }
+               }
+       }
+
+       if (signal_pending(current) && is_sync_kiocb(req) && zcopy) {
+               result = (timeout > 0) ? sock_intr_errno(timeout) : -EAGAIN;
+
+               timeout = wait_event_timeout(wait.wait,
+                               !sdp_cancel_read(req, conn, &wait, &copied),
+                               SDP_ZCOPY_CANCEL_TIMEOUT);
+               if (!timeout) {
+                       sdp_warn("sdp_read_cancel timed out. Abort.\n");
+                       sdp_conn_lock(conn);
+                       sdp_conn_abort(conn);
+                       sdp_conn_unlock(conn);
+               }
+       }
+#endif
+
        return ((copied > 0) ? copied : result);
 }
Index: drivers/infiniband/ulp/sdp/sdp_conn.h
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_conn.h       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_conn.h       (working copy)
@@ -377,6 +377,9 @@
 #ifdef _SDP_CONN_STATE_REC
        struct sdp_conn_state state_rec;
 #endif
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       struct list_head src_wait_list;
+#endif
 };
 
 #define SDP_WRAP_GT(x, y) ((signed int)((x) - (y)) > 0)
Index: drivers/infiniband/ulp/sdp/sdp_iocb.c
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_iocb.c       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_iocb.c       (working copy)
@@ -307,12 +307,23 @@
        sdp_dbg_data(NULL, "IOCB complete. <%d:%d:%08lx> value <%ld>",
                     iocb->req->ki_users, iocb->req->ki_key,
                     iocb->req->ki_flags, value);
+
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+       if (iocb->wait) {
+               unsigned long flags;
+               spin_lock_irqsave(&iocb->wait->lock, flags);
+               if (!--iocb->wait->outstanding) {
+                       wake_up(&iocb->wait->wait);
+               }
+               spin_unlock_irqrestore(&iocb->wait->lock, flags);
+       } else
+#endif
+               /*
+               * valid result can be 0 or 1 for complete so
+               * we ignore the value.
+               */
+               (void)aio_complete(iocb->req, value, 0);
        /*
-        * valid result can be 0 or 1 for complete so
-        * we ignore the value.
-        */
-       (void)aio_complete(iocb->req, value, 0);
-       /*
         * delete IOCB
         */
        sdp_iocb_destroy(iocb);
@@ -325,7 +336,17 @@
 {
        iocb->status = status;
 
-       if (in_atomic() || irqs_disabled()) {
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       if ((iocb->flags & SDP_IOCB_F_RECV) && iocb->wait) {
+               unsigned long flags;
+               spin_lock_irqsave(&iocb->wait->lock, flags);
+               sdp_iocb_q_put_tail(&iocb->wait->q, iocb);
+               wake_up(&iocb->wait->wait);
+               spin_unlock_irqrestore(&iocb->wait->lock, flags);
+       } else
+#endif
+       if ((iocb->flags & SDP_IOCB_F_RECV) &&
+           (in_atomic() || irqs_disabled())) {
                INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
                schedule_work(&iocb->completion);
        } else
@@ -382,6 +403,43 @@
        return NULL;
 }
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+struct sdpc_iocb *sdp_iocb_q_lookup_req(struct sdpc_iocb_q *table, struct 
kiocb *req)
+{
+       struct sdpc_iocb *iocb = NULL;
+       int counter;
+
+       for (counter = 0, iocb = table->head; counter < table->size;
+            counter++, iocb = iocb->next)
+               if (iocb->req == req)
+                       return iocb;
+
+       return NULL;
+}
+
+void sdp_iocb_q_mark_cancel(struct sdpc_iocb_q *table, struct kiocb *req)
+{
+       struct sdpc_iocb *iocb = NULL;
+       int counter;
+
+       for (counter = 0, iocb = table->head; counter < table->size;
+            counter++, iocb = iocb->next)
+               if (iocb->req == req)
+                       iocb->flags |= SDP_IOCB_F_CANCEL;
+
+}
+
+int sdp_iocb_find_req(struct sdpc_desc *element, void *arg)
+{
+       struct sdpc_iocb *iocb = (struct sdpc_iocb *) element;
+       struct kiocb *req = (struct kiocb *)arg;
+
+       if (element->type == SDP_DESC_TYPE_IOCB && iocb->req == req)
+               return 0;
+       return -ERANGE;
+}
+#endif
+
 /*
  * sdp_iocb_create - create an IOCB object
  */
Index: drivers/infiniband/ulp/sdp/sdp_iocb.h
===================================================================
--- drivers/infiniband/ulp/sdp/sdp_iocb.h       (revision 3958)
+++ drivers/infiniband/ulp/sdp/sdp_iocb.h       (working copy)
@@ -55,6 +55,9 @@
 #define SDP_IOCB_F_LOCKED 0x00000040 /* IOCB is locked in memory */
 #define SDP_IOCB_F_REG    0x00000080 /* IOCB memory is registered */
 #define SDP_IOCB_F_RECV   0x00000100 /* IOCB is for a receive request */
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+#define SDP_IOCB_F_WAITALL   0x00000200 /* IOCB is for WAITALL request */
+#endif
 #define SDP_IOCB_F_ALL    0xFFFFFFFF /* IOCB all mask */
 /*
  * zcopy constants.
@@ -66,10 +69,12 @@
  */
 #define sdp_iocb_q_size(table) ((table)->size)
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+struct sdpc_iocb_wait;
+#endif
 /*
  * INET read/write IOCBs
  */
-
 /*
  * save a kvec read/write for processing once data shows up.
  */
@@ -80,7 +85,7 @@
        struct sdpc_iocb_q *table; /* table to which this iocb belongs */
        void (*release)(struct sdpc_iocb *iocb); /* release the object */
        /*
-        * iocb sepcific
+        * iocb specific
         */
        int      flags;  /* usage flags */
        /*
@@ -112,6 +117,9 @@
        int           page_offset; /* offset into first page. */
 
        struct work_struct completion; /* task for defered completion. */
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+       struct sdpc_iocb_wait *wait;
+#endif
        /*
         * kernel iocb structure
         */
@@ -127,4 +135,26 @@
        int size;               /* current number of IOCBs in table */
 };
 
+#ifdef CONFIG_INFINIBAND_SDP_SEND_ZCOPY
+/* Report completions here */
+struct sdpc_iocb_wait {
+       spinlock_t lock;
+       int outstanding;
+       wait_queue_head_t wait;
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+       struct sdpc_iocb_q q; /* Receive iocbs only */
+       struct list_head src_wait_list; /* Receive only */
+#endif
+};
+
+#ifdef CONFIG_INFINIBAND_SDP_RECV_ZCOPY
+static inline void sdp_iocb_wake(struct list_head *head)
+{
+       struct sdpc_iocb_wait *wait;
+       list_for_each_entry(wait, head, src_wait_list)
+               wake_up(&wait->wait);
+}
+#endif
+
+#endif
 #endif /* _SDP_IOCB_H */

-- 
MST
_______________________________________________
openib-general mailing list
[email protected]
http://openib.org/mailman/listinfo/openib-general

To unsubscribe, please visit http://openib.org/mailman/listinfo/openib-general

Reply via email to