Hotspot performance analysis using VTune showed pthread_mutex_unlock() as the most significant hotspot when transferring small messages using rstream. To reduce the impact of using pthread mutexes, replace it with a custom lock built using an atomic variable and a semaphore. When there's no contention for the lock (which is the expected case for nonblocking sockets), the synchronization is reduced to incrementing and decrementing an atomic variable.
A test that acquired and released a lock 2 billion times reported that the custom lock was roughly 20% faster than using the mutex. 26.6 seconds versus 33.0 seconds. Unfortunately, further analysis showed that using the custom lock provided a minimal performance gain on rstream itself, and simply moved the hotspot to the custom unlock call. The hotspot is likely a result of some other interaction, rather than caused by slowness in releasing a lock. However, we keep the custom lock based on the results of the direct lock tests that were done. Signed-off-by: Sean Hefty <[email protected]> --- The pthread mutex implementation may very well use an atomic variable around some sort of futex. I gave up trying to find the pthread source code. As to the hotspot, the unlock in question occurs during rsend(). The hotspot may simply be the result of processing the send completion. src/cma.h | 28 +++++++++++++++++++++ src/rsocket.c | 77 +++++++++++++++++++++------------------------------------ 2 files changed, 56 insertions(+), 49 deletions(-) diff --git a/src/cma.h b/src/cma.h index 91528c0..f28020e 100644 --- a/src/cma.h +++ b/src/cma.h @@ -42,6 +42,7 @@ #include <errno.h> #include <endian.h> #include <byteswap.h> +#include <semaphore.h> #include <rdma/rdma_cma.h> @@ -68,6 +69,33 @@ static inline uint64_t ntohll(uint64_t x) { return x; } #define min(a, b) (a < b ? a : b) +/* + * Fast synchronization for low contention locking. + */ +typedef struct { + sem_t sem; + volatile int cnt; +} fastlock_t; +static inline void fastlock_init(fastlock_t *lock) +{ + sem_init(&lock->sem, 0, 0); + lock->cnt = 0; +} +static inline void fastlock_destroy(fastlock_t *lock) +{ + sem_destroy(&lock->sem); +} +static inline void fastlock_acquire(fastlock_t *lock) +{ + if (__sync_add_and_fetch(&lock->cnt, 1) > 1) + sem_wait(&lock->sem); +} +static inline void fastlock_release(fastlock_t *lock) +{ + if (__sync_sub_and_fetch(&lock->cnt, 1) > 0) + sem_post(&lock->sem); +} + int ucma_complete(struct rdma_cm_id *id); static inline int ERR(int err) { diff --git a/src/rsocket.c b/src/rsocket.c index 775e9b0..2ffde9b 100644 --- a/src/rsocket.c +++ b/src/rsocket.c @@ -141,11 +141,10 @@ enum rs_state { struct rsocket { struct rdma_cm_id *cm_id; - pthread_mutex_t slock; - pthread_mutex_t rlock; - pthread_mutex_t cq_lock; - pthread_cond_t cq_cond; - int cq_busy; + fastlock_t slock; + fastlock_t rlock; + fastlock_t cq_lock; + fastlock_t cq_wait_lock; int opts; long fd_flags; @@ -225,10 +224,10 @@ static struct rsocket *rs_alloc(struct rsocket *inherited_rs) rs->index = -1; rs->sbuf_size = inherited_rs ? inherited_rs->sbuf_size : RS_BUF_SIZE; rs->rbuf_size = inherited_rs ? inherited_rs->rbuf_size : RS_BUF_SIZE; - pthread_mutex_init(&rs->slock, NULL); - pthread_mutex_init(&rs->rlock, NULL); - pthread_mutex_init(&rs->cq_lock, NULL); - pthread_cond_init(&rs->cq_cond, NULL); + fastlock_init(&rs->slock); + fastlock_init(&rs->rlock); + fastlock_init(&rs->cq_lock); + fastlock_init(&rs->cq_wait_lock); return rs; } @@ -375,10 +374,10 @@ static void rs_free(struct rsocket *rs) rdma_destroy_id(rs->cm_id); } - pthread_cond_destroy(&rs->cq_cond); - pthread_mutex_destroy(&rs->cq_lock); - pthread_mutex_destroy(&rs->rlock); - pthread_mutex_destroy(&rs->slock); + fastlock_destroy(&rs->cq_wait_lock); + fastlock_destroy(&rs->cq_lock); + fastlock_destroy(&rs->rlock); + fastlock_destroy(&rs->slock); free(rs); } @@ -833,13 +832,6 @@ static int rs_get_cq_event(struct rsocket *rs) return ret; } -static void rs_signal_cq_waiters(struct rsocket *rs) -{ - pthread_mutex_lock(&rs->cq_lock); - pthread_cond_signal(&rs->cq_cond); - pthread_mutex_unlock(&rs->cq_lock); -} - /* * Although we serialize rsend and rrecv calls with respect to themselves, * both calls may run simultaneously and need to poll the CQ for completions. @@ -850,31 +842,15 @@ static void rs_signal_cq_waiters(struct rsocket *rs) * which could be stalled until the remote process calls rrecv. This should * not block rrecv from receiving data from the remote side however. * - * Perform a quick test before trying to acquire any locks. Also note that - * busy may be set to 1 by another thread once it's been reset to 0. + * We handle this by using two locks. The cq_lock protects against polling + * the CQ and processing completions. The cq_wait_lock serializes access to + * waiting on the CQ. */ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rsocket *rs)) { int ret; - pthread_mutex_lock(&rs->cq_lock); - if (rs->cq_busy && nonblock) { - pthread_mutex_unlock(&rs->cq_lock); - return ERR(EWOULDBLOCK); - } - - while (rs->cq_busy && !test(rs)) { - pthread_cond_wait(&rs->cq_cond, &rs->cq_lock); - - if (test(rs)) { - pthread_mutex_unlock(&rs->cq_lock); - return 0; - } - } - - rs->cq_busy = 1; - pthread_mutex_unlock(&rs->cq_lock); - + fastlock_acquire(&rs->cq_lock); do { rs_update_credits(rs); ret = rs_poll_cq(rs); @@ -890,14 +866,17 @@ static int rs_process_cq(struct rsocket *rs, int nonblock, int (*test)(struct rs rs->cq_armed = 1; } else { rs_update_credits(rs); - rs_signal_cq_waiters(rs); + fastlock_acquire(&rs->cq_wait_lock); + fastlock_release(&rs->cq_lock); + ret = rs_get_cq_event(rs); + fastlock_release(&rs->cq_wait_lock); + fastlock_acquire(&rs->cq_lock); } } while (!ret); rs_update_credits(rs); - rs->cq_busy = 0; - rs_signal_cq_waiters(rs); + fastlock_release(&rs->cq_lock); return ret; } @@ -1002,7 +981,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) return ret; } } - pthread_mutex_lock(&rs->rlock); + fastlock_acquire(&rs->rlock); if (!rs_have_rdata(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_have_rdata); if (ret && errno != ECONNRESET) @@ -1040,7 +1019,7 @@ ssize_t rrecv(int socket, void *buf, size_t len, int flags) } rs->rbuf_bytes_avail += len - left; out: - pthread_mutex_unlock(&rs->rlock); + fastlock_release(&rs->rlock); return ret ? ret : len - left; } @@ -1105,7 +1084,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) } } - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size, buf += xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1157,7 +1136,7 @@ ssize_t rsend(int socket, const void *buf, size_t len, int flags) if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } @@ -1214,7 +1193,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags for (i = 1; i < iovcnt; i++) len += iov[i].iov_len; - pthread_mutex_lock(&rs->slock); + fastlock_acquire(&rs->slock); for (left = len; left; left -= xfer_size) { if (!rs_can_send(rs)) { ret = rs_process_cq(rs, rs_nonblocking(rs, flags), rs_can_send); @@ -1262,7 +1241,7 @@ static ssize_t rsendv(int socket, const struct iovec *iov, int iovcnt, int flags if (ret) break; } - pthread_mutex_unlock(&rs->slock); + fastlock_release(&rs->slock); return (ret && left == len) ? ret : len - left; } -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to [email protected] More majordomo info at http://vger.kernel.org/majordomo-info.html
