Hotspot performance analysis using VTune showed pthread_mutex_unlock()
as the most significant hotspot when transferring small messages using
rstream.  To reduce the impact of using pthread mutexes, replace it
with a custom lock built using an atomic variable and a semaphore.
When there's no contention for the lock (which is the expected case
for nonblocking sockets), the synchronization is reduced to
incrementing and decrementing an atomic variable.

A test that acquired and released a lock 2 billion times reported that
the custom lock was roughly 20% faster than using the mutex.
26.6 seconds versus 33.0 seconds.

Unfortunately, further analysis showed that using the custom lock
provided a minimal performance gain on rstream itself, and simply
moved the hotspot to the custom unlock call.  The hotspot is likely
a result of some other interaction, rather than caused by slowness
in releasing a lock.  However, we keep the custom lock based on
the results of the direct lock tests that were done.

Signed-off-by: Sean Hefty <[email protected]>
---
The pthread mutex implementation may very well use an atomic variable
around some sort of futex.  I gave up trying to find the pthread
source code.

As to the hotspot, the unlock in question occurs during rsend().  The
hotspot may simply be the result of processing the send completion.

 src/cma.h     |   28 +++++++++++++++++++++
 src/rsocket.c |   77 +++++++++++++++++++++------------------------------------
 2 files changed, 56 insertions(+), 49 deletions(-)

diff --git a/src/cma.h b/src/cma.h
index 91528c0..f28020e 100644
--- a/src/cma.h
+++ b/src/cma.h
@@ -42,6 +42,7 @@
 #include <errno.h>
 #include <endian.h>
 #include <byteswap.h>
+#include <semaphore.h>
 
 #include <rdma/rdma_cma.h>
 
@@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; }
 
 #define min(a, b) (a < b ? a : b)
 
+/*
+ * Fast synchronization for low contention locking.
+ */
+typedef struct {
+       sem_t sem;
+       volatile int cnt;
+} fastlock_t;
+static inline void fastlock_init(fastlock_t *lock)
+{
+       sem_init(&lock->sem, 0, 0);
+       lock->cnt = 0;
+}
+static inline void fastlock_destroy(fastlock_t *lock)
+{
+       sem_destroy(&lock->sem);
+}
+static inline void fastlock_acquire(fastlock_t *lock)
+{
+       if (__sync_add_and_fetch(&lock->cnt, 1) > 1)
+               sem_wait(&lock->sem);
+}
+static inline void fastlock_release(fastlock_t *lock)
+{
+       if (__sync_sub_and_fetch(&lock->cnt, 1) > 0)
+               sem_post(&lock->sem);
+}
+
 int ucma_complete(struct rdma_cm_id *id);
 static inline int ERR(int err)
 {
diff --git a/src/rsocket.c b/src/rsocket.c
index 775e9b0..2ffde9b 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -141,11 +141,10 @@ enum rs_state {
 
 struct rsocket {
        struct rdma_cm_id *cm_id;
-       pthread_mutex_t   slock;
-       pthread_mutex_t   rlock;
-       pthread_mutex_t   cq_lock;
-       pthread_cond_t    cq_cond;
-       int               cq_busy;
+       fastlock_t        slock;
+       fastlock_t        rlock;
+       fastlock_t        cq_lock;
+       fastlock_t        cq_wait_lock;
 
        int               opts;
        long              fd_flags;
@@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket 
*inherited_rs)
        rs->index = -1;
        rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE;
        rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE;
-       pthread_mutex_init(&rs->slock, NULL);
-       pthread_mutex_init(&rs->rlock, NULL);
-       pthread_mutex_init(&rs->cq_lock, NULL);
-       pthread_cond_init(&rs->cq_cond, NULL);
+       fastlock_init(&rs->slock);
+       fastlock_init(&rs->rlock);
+       fastlock_init(&rs->cq_lock);
+       fastlock_init(&rs->cq_wait_lock);
        return rs;
 }
 
@@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs)
                rdma_destroy_id(rs->cm_id);
        }
 
-       pthread_cond_destroy(&rs->cq_cond);
-       pthread_mutex_destroy(&rs->cq_lock);
-       pthread_mutex_destroy(&rs->rlock);
-       pthread_mutex_destroy(&rs->slock);
+       fastlock_destroy(&rs->cq_wait_lock);
+       fastlock_destroy(&rs->cq_lock);
+       fastlock_destroy(&rs->rlock);
+       fastlock_destroy(&rs->slock);
        free(rs);
 }
 
@@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs)
        return ret;
 }
 
-static void rs_signal_cq_waiters(struct rsocket *rs)
-{
-       pthread_mutex_lock(&rs->cq_lock);
-       pthread_cond_signal(&rs->cq_cond);
-       pthread_mutex_unlock(&rs->cq_lock);
-}
-
 /*
  * Although we serialize rsend and rrecv calls with respect to themselves,
  * both calls may run simultaneously and need to poll the CQ for completions.
@@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs)
  * which could be stalled until the remote process calls rrecv.  This should
  * not block rrecv from receiving data from the remote side however.
  *
- * Perform a quick test before trying to acquire any locks.  Also note that
- * busy may be set to 1 by another thread once it's been reset to 0.
+ * We handle this by using two locks.  The cq_lock protects against polling
+ * the CQ and processing completions.  The cq_wait_lock serializes access to
+ * waiting on the CQ.
  */
 static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct 
rsocket *rs))
 {
        int ret;
 
-       pthread_mutex_lock(&rs->cq_lock);
-       if (rs->cq_busy && nonblock) {
-               pthread_mutex_unlock(&rs->cq_lock);
-               return ERR(EWOULDBLOCK);
-       }
-
-       while (rs->cq_busy && !test(rs)) {
-               pthread_cond_wait(&rs->cq_cond, &rs->cq_lock);
-
-               if (test(rs)) {
-                       pthread_mutex_unlock(&rs->cq_lock);
-                       return 0;
-               }
-       }
-
-       rs->cq_busy = 1;
-       pthread_mutex_unlock(&rs->cq_lock);
-
+       fastlock_acquire(&rs->cq_lock);
        do {
                rs_update_credits(rs);
                ret = rs_poll_cq(rs);
@@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int 
nonblock, int (*test)(struct rs
                        rs->cq_armed = 1;
                } else {
                        rs_update_credits(rs);
-                       rs_signal_cq_waiters(rs);
+                       fastlock_acquire(&rs->cq_wait_lock);
+                       fastlock_release(&rs->cq_lock);
+
                        ret = rs_get_cq_event(rs);
+                       fastlock_release(&rs->cq_wait_lock);
+                       fastlock_acquire(&rs->cq_lock);
                }
        } while (!ret);
 
        rs_update_credits(rs);
-       rs->cq_busy = 0;
-       rs_signal_cq_waiters(rs);
+       fastlock_release(&rs->cq_lock);
        return ret;
 }
 
@@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags)
                        return ret;
                }
        }
-       pthread_mutex_lock(&rs->rlock);
+       fastlock_acquire(&rs->rlock);
        if (!rs_have_rdata(rs)) {
                ret = rs_process_cq(rs, rs_nonblocking(rs, flags), 
rs_have_rdata);
                if (ret && errno != ECONNRESET)
@@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int 
flags)
        }
        rs->rbuf_bytes_avail += len - left;
 out:
-       pthread_mutex_unlock(&rs->rlock);
+       fastlock_release(&rs->rlock);
        return ret ? ret : len - left;
 }
 
@@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, 
int flags)
                }
        }
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size, buf += xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), 
rs_can_send);
@@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, 
int flags)
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }
@@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec 
*iov, int iovcnt, int flags
        for (i = 1; i < iovcnt; i++)
                len += iov[i].iov_len;
 
-       pthread_mutex_lock(&rs->slock);
+       fastlock_acquire(&rs->slock);
        for (left = len; left; left -= xfer_size) {
                if (!rs_can_send(rs)) {
                        ret = rs_process_cq(rs, rs_nonblocking(rs, flags), 
rs_can_send);
@@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec 
*iov, int iovcnt, int flags
                if (ret)
                        break;
        }
-       pthread_mutex_unlock(&rs->slock);
+       fastlock_release(&rs->slock);
 
        return (ret && left == len) ? ret : len - left;
 }


--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to