We introduce rsocket extensions for supporting direct
data placement (also known as zero copy). Direct data
placement avoids data copies into network buffers when
sending or receiving data. This patch implements zero
copies on the receive side, but adds some basic framework for
supporting it on the sending side.
Integrating zero copy support into the existing socket APIs
is difficult to achieve when the sockets are set as
nonblocking. Any such implementation is likely to be unusable
in practice. The problem stems from the fact that socket
operations are synchronous in nature. Support for asynchronous
operations is limited to connection establishment.
Therefore we introduce new calls to handle direct data placement.
The use of the new calls is optional and does not affect the
use of the existing calls. An attempt is made to have the new
routines integrate naturally with the existing APIs. The new
functions are: riomap, riounmap, and riowrite. The basic operation
can be described as follows:
1. App A calls riomap to register a data buffer with the local
RDMA device. Riomap returns an off_t offset value that
corresponds to the registered data buffer. The app may
select the offset value.
2. Rsockets will transmit an internal message to the remote
peer with information about the registration. This exchange
is hidden from the applications.
3. App A sends a notification message to app B indicating that
the remote iomapped buffer is now available to receive data.
4. App B calls riowrite to transmit data directly into the
riomapped data buffer.
5. App B sends a notification message to app A indicating that
data is available in the mapped buffer.
6. After all transfers are complete, app A calls riounmap to
deregister its data buffer.
Riomap and riounmap are functionally equivalent to RDMA
memory registration and deregistration routines. They are loosely
based on the mmap and munmap APIs.
off_t riomap(int socket, void *buf, size_t len,
int prot, int flags, off_t offset)
Riomap registers an application buffer with the RDMA hardware
associated with an rsocket. The buffer is registered either for
local only access (PROT_NONE) or for remote write access (PROT_WRITE).
When registered for remote access, the buffer is mapped to a given
offset. The offset is either provided by the user, or if the user
selects -1 for the offset, rsockets selects one. The remote peer may
access an iomapped buffer directly by specifying the correct offset.
The mapping is not guaranteed to be available until after the remote
peer receives a data transfer initiated after riomap has completed.
int riounmap(int socket, void *buf, size_t len)
Riounmap removes the mapping between a buffer and an rsocket.
size_t riowrite(int socket, const void *buf, size_t count,
off_t offset, int flags)
Riowrite allows an application to transfer data over an rsocket
directly into a remotely iomapped buffer. The remote buffer is specified
through an offset parameter, which corresponds to a remote iomapped buffer.
>From the sender's perspective, riowrite behaves similar to rwrite. From
a receiver's view, riowrite transfers are silently redirected into a pre-
determined data buffer. Data is received automatically, and the receiver
is not informed of the transfer. However, iowrite data is still considered
part of the data stream, such that iowrite data will be written before a
subsequent transfer is received. A message sent immediately after
initiating an iowrite may be used to notify the receiver of the iowrite.
It should be noted that the current implementation primarily focused
on being functional for evaluation purposes. Some checks have been
deferred for subsequent patches, and performance is currently limited
by linear lookups.
Signed-off-by: Sean Hefty <[email protected]>
---
docs/rsocket | 51 +++++
include/rdma/rsocket.h | 10 +
man/rsocket.7 | 53 +++++
src/indexer.h | 43 ++++
src/librdmacm.map | 3
src/rsocket.c | 521 +++++++++++++++++++++++++++++++++++++++++++++---
6 files changed, 640 insertions(+), 41 deletions(-)
diff --git a/docs/rsocket b/docs/rsocket
index 5399f6c..1484f65 100644
--- a/docs/rsocket
+++ b/docs/rsocket
@@ -110,11 +110,11 @@ Bits Message Meaning of
31:29 Type Bits 28:0
000 Data Transfer bytes transfered
001 reserved
-010 reserved
+010 reserved - used internally, available for future use
011 reserved
100 Credit Update received credits granted
101 reserved
-110 reserved
+110 Iomap Updated index of updated entry
111 Control control message type
Data Transfer
@@ -133,6 +133,12 @@ care not to modify a remote target SGL while it may be in
use. This is done
by tracking when a receive buffer referenced by a remote target SGL has been
filled.
+Iomap Updated
+Used to indicate that a remote iomap entry was updated. The updated entry
+contains the offset value associated with an address, length, and rkey. Once
+an iomap has been updated, the local application can issue directed IO
+transfers against the corresponding remote buffer.
+
Control Message - DISCONNECT
Indicates that the rsocket connection has been fully disconnected and will no
longer send or receive data. Data received before the disconnect message was
@@ -142,3 +148,44 @@ Control Message - SHUTDOWN
Indicates that the remote rsocket has shutdown the send side of its
connection. The recipient of a shutdown message will no longer accept
incoming data, but may still transfer outbound data.
+
+
+Iomapped Buffers
+----------------
+Rsockets allows for zero-copy transfers using what it refers to as iomapped
+buffers. Iomapping and direct data placement (zero-copy) transfers are done
+using rsocket specific extensions. The general operation is similar to
+that used for normal data transfers described above.
+
+ host A host B
+ remote iomap
+ target iomap <----------- [ ]
+ [ ] ------
+ [ ] -- ------ iomapped buffer(s)
+ -- -----> +--+
+ -- | |
+ -- | |
+ -- | |
+ -- +--+
+ --
+ ---> +--+
+ | |
+ | |
+ +--+
+
+The remote iomap contains the address, size, and rkey of the target iomap. As
+the applicaton maps buffers host B to a given rsocket, rsockets will issue an
RDMA
+write against one of the entries in the target iomap on host A. The
+updated entry will reference an available iomapped buffer. Immediate data
+included with the RDMA write will indicate to host A that a target iomap
+has been updated.
+
+When host A wishes to transfer directly into an iomapped buffer, it will check
+its target iomap for an offset corresponding to a remotely mapped buffer. A
+matching iomap entry will contain the address, size, and rkey of the target
+buffer on host B. Host A will then issue an RDMA operation against the
+registered remote data buffer.
+
+From host A's perspective, the transfer appears as a normal send/write
+operation, with the data stream redirected directly into the receiving
+application's buffer.
diff --git a/include/rdma/rsocket.h b/include/rdma/rsocket.h
index 65feda9..f220c13 100644
--- a/include/rdma/rsocket.h
+++ b/include/rdma/rsocket.h
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2011 Intel Corporation. All rights reserved.
+ * Copyright (c) 2011-2012 Intel Corporation. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -39,6 +39,7 @@
#include <errno.h>
#include <poll.h>
#include <sys/select.h>
+#include <sys/mman.h>
#ifdef __cplusplus
extern "C" {
@@ -76,7 +77,8 @@ int rgetsockname(int socket, struct sockaddr *addr, socklen_t
*addrlen);
enum {
RDMA_SQSIZE,
RDMA_RQSIZE,
- RDMA_INLINE
+ RDMA_INLINE,
+ RDMA_IOMAPSIZE
};
int rsetsockopt(int socket, int level, int optname,
@@ -85,6 +87,10 @@ int rgetsockopt(int socket, int level, int optname,
void *optval, socklen_t *optlen);
int rfcntl(int socket, int cmd, ... /* arg */ );
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t
offset);
+int riounmap(int socket, void *buf, size_t len);
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int
flags);
+
#ifdef __cplusplus
}
#endif
diff --git a/man/rsocket.7 b/man/rsocket.7
index 2ed5ca4..bc5bb10 100644
--- a/man/rsocket.7
+++ b/man/rsocket.7
@@ -6,9 +6,9 @@ rsocket \- RDMA socket API
.SH "DESCRIPTION"
RDMA socket API and protocol
.SH "NOTES"
-rsockets is a protocol over RDMA that supports a socket-level API
-for applications. rsocket APIs are intended to match the behavior
-of corresponding socket calls, except where noted. rsocket
+Rsockets is a protocol over RDMA that supports a socket-level API
+for applications. Rsocket APIs are intended to match the behavior
+of corresponding socket calls, except where noted. Rsocket
functions match the name and function signature of socket calls,
with the exception that all function calls are prefixed with an 'r'.
.P
@@ -30,7 +30,7 @@ rgetpeername, rgetsockname
.P
rsetsockopt, rgetsockopt, rfcntl
.P
-Functions take the same parameters as that use for sockets. The
+Functions take the same parameters as that used for sockets. The
follow capabilities and flags are supported at this time:
.P
PF_INET, PF_INET6, SOCK_STREAM, IPPROTO_TCP, TCP_MAXSEG
@@ -41,6 +41,47 @@ SO_REUSEADDR, TCP_NODELAY, SO_ERROR, SO_SNDBUF, SO_RCVBUF
.P
O_NONBLOCK
.P
+Rsockets provides extensions beyond normal socket routines that
+allow for direct placement of data into an application's buffer.
+This is also known as zero-copy support, since data is sent and
+received directly, bypassing copies into network controlled buffers.
+The following calls and options support direct data placement.
+.P
+riomap, riounmap, riowrite
+.TP
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t
offset)
+.TP
+Riomap registers an application buffer with the RDMA hardware
+associated with an rsocket. The buffer is registered either for
+local only access (PROT_NONE) or for remote write access (PROT_WRITE).
+When registered for remote access, the buffer is mapped to a given
+offset. The offset is either provided by the user, or if the user
+selects -1 for the offset, rsockets selects one. The remote peer may
+access an iomapped buffer directly by specifying the correct offset.
+The mapping is not guaranteed to be available until after the remote
+peer receives a data transfer initiated after riomap has completed.
+.P
+riounmap
+.TP
+int riounmap(int socket, void *buf, size_t len)
+.TP
+Riounmap removes the mapping between a buffer and an rsocket.
+.P
+riowrite
+.TP
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int
flags)
+.TP
+Riowrite allows an application to transfer data over an rsocket
+directly into a remotely iomapped buffer. The remote buffer is specified
+through an offset parameter, which corresponds to a remote iomapped buffer.
+From the sender's perspective, riowrite behaves similar to rwrite. From
+a receiver's view, riowrite transfers are silently redirected into a pre-
+determined data buffer. Data is received automatically, and the receiver
+is not informed of the transfer. However, iowrite data is still considered
+part of the data stream, such that iowrite data will be written before a
+subsequent transfer is received. A message sent immediately after initiating
+an iowrite may be used to notify the receiver of the iowrite.
+.P
In addition to standard socket options, rsockets supports options
specific to RDMA devices and protocols. These options are accessible
through rsetsockopt using SOL_RDMA option level.
@@ -50,6 +91,8 @@ RDMA_SQSIZE - Integer size of the underlying send queue.
RDMA_RQSIZE - Integer size of the underlying receive queue.
.TP
RDMA_INLINE - Integer size of inline data.
+.TP
+RDMA_IOMAPSIZE - Integer number of remote IO mappings supported
.P
Note that rsockets fd's cannot be passed into non-rsocket calls. For
applications which must mix rsocket fd's with standard socket fd's or
@@ -84,6 +127,8 @@ rqsize_default - default size of receive queue
.P
inline_default - default size of inline data
.P
+iomap_size - default size of remote iomapping table
+.P
If configuration files are not available, rsockets uses internal defaults.
.SH "SEE ALSO"
rdma_cm(7)
diff --git a/src/indexer.h b/src/indexer.h
index 26e7f98..0c5f388 100644
--- a/src/indexer.h
+++ b/src/indexer.h
@@ -31,6 +31,9 @@
*
*/
+#if !defined(INDEXER_H)
+#define INDEXER_H
+
#if HAVE_CONFIG_H
# include <config.h>
#endif /* HAVE_CONFIG_H */
@@ -99,3 +102,43 @@ static inline void *idm_lookup(struct index_map *idm, int
index)
return ((index <= IDX_MAX_INDEX) && idm->array[idx_array_index(index)])
?
idm_at(idm, index) : NULL;
}
+
+typedef struct _dlist_entry {
+ struct _dlist_entry *next;
+ struct _dlist_entry *prev;
+} dlist_entry;
+
+static inline void dlist_init(dlist_entry *head)
+{
+ head->next = head;
+ head->prev = head;
+}
+
+static inline int dlist_empty(dlist_entry *head)
+{
+ return head->next == head;
+}
+
+static inline void dlist_insert_after(dlist_entry *item, dlist_entry *head)
+{
+ item->next = head->next;
+ item->prev = head;
+ head->next->prev = item;
+ head->next = item;
+}
+
+static inline void dlist_insert_before(dlist_entry *item, dlist_entry *head)
+{
+ dlist_insert_after(item, head->prev);
+}
+
+#define dlist_insert_head dlist_insert_after
+#define dlist_insert_tail dlist_insert_before
+
+static inline void dlist_remove(dlist_entry *item)
+{
+ item->prev->next = item->next;
+ item->next->prev = item->prev;
+}
+
+#endif /* INDEXER_H */
diff --git a/src/librdmacm.map b/src/librdmacm.map
index 5c317a3..d5ef736 100644
--- a/src/librdmacm.map
+++ b/src/librdmacm.map
@@ -63,5 +63,8 @@ RDMACM_1.0 {
rselect;
rdma_get_src_port;
rdma_get_dst_port;
+ riomap;
+ riounmap;
+ riowrite;
local: *;
};
diff --git a/src/rsocket.c b/src/rsocket.c
index cc5effe..74dbcc7 100644
--- a/src/rsocket.c
+++ b/src/rsocket.c
@@ -55,6 +55,7 @@
#define RS_OLAP_START_SIZE 2048
#define RS_MAX_TRANSFER 65536
+#define RS_SNDLOWAT 64
#define RS_QP_MAX_SIZE 0xFFFE
#define RS_QP_CTRL_SIZE 4
#define RS_CONN_RETRIES 6
@@ -62,6 +63,7 @@
static struct index_map idm;
static pthread_mutex_t mut = PTHREAD_MUTEX_INITIALIZER;
+static uint16_t def_iomap_size = 0;
static uint16_t def_inline = 64;
static uint16_t def_sqsize = 384;
static uint16_t def_rqsize = 384;
@@ -76,19 +78,22 @@ static uint32_t polling_time = 10;
* bit 29: more data, 0 - end of transfer, 1 - more data available
*
* for data transfers:
- * bits [28:0]: bytes transfered, 0 = 1 GB
+ * bits [28:0]: bytes transferred
* for control messages:
+ * SGL, CTRL
* bits [28-0]: receive credits granted
+ * IOMAP_SGL
+ * bits [28-16]: reserved, bits [15-0]: index
*/
enum {
RS_OP_DATA,
RS_OP_RSVD_DATA_MORE,
- RS_OP_RSVD_DRA,
+ RS_OP_WRITE, /* opcode is not transmitted over the network */
RS_OP_RSVD_DRA_MORE,
RS_OP_SGL,
RS_OP_RSVD,
- RS_OP_RSVD_DRA_SGL,
+ RS_OP_IOMAP_SGL,
RS_OP_CTRL
};
#define rs_msg_set(op, data) ((op << 29) | (uint32_t) (data))
@@ -111,15 +116,30 @@ struct rs_sge {
uint32_t length;
};
-#define RS_MIN_INLINE (sizeof(struct rs_sge))
-#define rs_host_is_net() (1 == htonl(1))
-#define RS_CONN_FLAG_NET 1
+struct rs_iomap {
+ uint64_t offset;
+ struct rs_sge sge;
+};
+
+struct rs_iomap_mr {
+ uint64_t offset;
+ struct ibv_mr *mr;
+ dlist_entry entry;
+ atomic_t refcnt;
+ int index; /* -1 if mapping is local and not in iomap_list */
+};
+
+#define RS_MIN_INLINE (sizeof(struct rs_sge))
+#define rs_host_is_net() (1 == htonl(1))
+#define RS_CONN_FLAG_NET (1 << 0)
+#define RS_CONN_FLAG_IOMAP (1 << 1)
struct rs_conn_data {
uint8_t version;
uint8_t flags;
uint16_t credits;
- uint32_t reserved2;
+ uint8_t reserved[3];
+ uint8_t target_iomap_size;
struct rs_sge target_sgl;
struct rs_sge data_buf;
};
@@ -155,6 +175,7 @@ struct rsocket {
fastlock_t rlock;
fastlock_t cq_lock;
fastlock_t cq_wait_lock;
+ fastlock_t iomap_lock;
int opts;
long fd_flags;
@@ -186,10 +207,19 @@ struct rsocket {
int remote_sge;
struct rs_sge remote_sgl;
+ struct rs_sge remote_iomap;
+
+ struct rs_iomap_mr *remote_iomappings;
+ dlist_entry iomap_list;
+ dlist_entry iomap_queue;
+ int iomap_pending;
struct ibv_mr *target_mr;
int target_sge;
- volatile struct rs_sge target_sgl[RS_SGL_SIZE];
+ int target_iomap_size;
+ void *target_buffer_list;
+ volatile struct rs_sge *target_sgl;
+ struct rs_iomap *target_iomap;
uint32_t rbuf_size;
struct ibv_mr *rmr;
@@ -201,6 +231,18 @@ struct rsocket {
uint8_t *sbuf;
};
+static int rs_value_to_scale(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (1 << (bits - 1)) | (value >> bits);
+}
+
+static int rs_scale_to_value(int value, int bits)
+{
+ return value <= (1 << (bits - 1)) ?
+ value : (value & ~(1 << (bits - 1))) << bits;
+}
+
void rs_configure(void)
{
FILE *f;
@@ -247,9 +289,17 @@ void rs_configure(void)
if ((f = fopen(RS_CONF_DIR "/wmem_default", "r"))) {
fscanf(f, "%u", &def_wmem);
fclose(f);
+ if (def_wmem < RS_SNDLOWAT)
+ def_wmem = RS_SNDLOWAT << 1;
+ }
+
+ if ((f = fopen(RS_CONF_DIR "/iomap_size", "r"))) {
+ fscanf(f, "%hu", &def_iomap_size);
+ fclose(f);
- if (def_wmem < 1)
- def_wmem = 1;
+ /* round to supported values */
+ def_iomap_size = (uint8_t) rs_value_to_scale(
+ (uint16_t) rs_scale_to_value(def_iomap_size, 8), 8);
}
init = 1;
out:
@@ -287,6 +337,7 @@ static struct rsocket *rs_alloc(struct rsocket
*inherited_rs)
rs->sq_size = inherited_rs->sq_size;
rs->rq_size = inherited_rs->rq_size;
rs->ctrl_avail = inherited_rs->ctrl_avail;
+ rs->target_iomap_size = inherited_rs->target_iomap_size;
} else {
rs->sbuf_size = def_wmem;
rs->rbuf_size = def_mem;
@@ -294,11 +345,15 @@ static struct rsocket *rs_alloc(struct rsocket
*inherited_rs)
rs->sq_size = def_sqsize;
rs->rq_size = def_rqsize;
rs->ctrl_avail = RS_QP_CTRL_SIZE;
+ rs->target_iomap_size = def_iomap_size;
}
fastlock_init(&rs->slock);
fastlock_init(&rs->rlock);
fastlock_init(&rs->cq_lock);
fastlock_init(&rs->cq_wait_lock);
+ fastlock_init(&rs->iomap_lock);
+ dlist_init(&rs->iomap_list);
+ dlist_init(&rs->iomap_queue);
return rs;
}
@@ -336,6 +391,8 @@ static void rs_set_qp_size(struct rsocket *rs)
static int rs_init_bufs(struct rsocket *rs)
{
+ size_t len;
+
rs->rmsg = calloc(rs->rq_size + 1, sizeof(*rs->rmsg));
if (!rs->rmsg)
return -1;
@@ -348,11 +405,21 @@ static int rs_init_bufs(struct rsocket *rs)
if (!rs->smr)
return -1;
- rs->target_mr = rdma_reg_write(rs->cm_id, (void *) rs->target_sgl,
- sizeof(rs->target_sgl));
+ len = sizeof(*rs->target_sgl) * RS_SGL_SIZE +
+ sizeof(*rs->target_iomap) * rs->target_iomap_size;
+ rs->target_buffer_list = malloc(len);
+ if (!rs->target_buffer_list)
+ return -1;
+
+ rs->target_mr = rdma_reg_write(rs->cm_id, rs->target_buffer_list, len);
if (!rs->target_mr)
return -1;
+ memset(rs->target_buffer_list, 0, len);
+ rs->target_sgl = rs->target_buffer_list;
+ if (rs->target_iomap_size)
+ rs->target_iomap = (struct rs_iomap *) (rs->target_sgl +
RS_SGL_SIZE);
+
rs->rbuf = calloc(rs->rbuf_size, sizeof(*rs->rbuf));
if (!rs->rbuf)
return -1;
@@ -452,6 +519,35 @@ static int rs_create_ep(struct rsocket *rs)
return 0;
}
+static void rs_release_iomap_mr(struct rs_iomap_mr *iomr)
+{
+ if (atomic_dec(&iomr->refcnt))
+ return;
+
+ dlist_remove(&iomr->entry);
+ ibv_dereg_mr(iomr->mr);
+ if (iomr->index >= 0)
+ iomr->mr = NULL;
+ else
+ free(iomr);
+}
+
+static void rs_free_iomappings(struct rsocket *rs)
+{
+ struct rs_iomap_mr *iomr;
+
+ while (!dlist_empty(&rs->iomap_list)) {
+ iomr = container_of(rs->iomap_list.next,
+ struct rs_iomap_mr, entry);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+ while (!dlist_empty(&rs->iomap_queue)) {
+ iomr = container_of(rs->iomap_queue.next,
+ struct rs_iomap_mr, entry);
+ riounmap(rs->index, iomr->mr->addr, iomr->mr->length);
+ }
+}
+
static void rs_free(struct rsocket *rs)
{
if (rs->index >= 0)
@@ -472,15 +568,20 @@ static void rs_free(struct rsocket *rs)
free(rs->rbuf);
}
- if (rs->target_mr)
- rdma_dereg_mr(rs->target_mr);
+ if (rs->target_buffer_list) {
+ if (rs->target_mr)
+ rdma_dereg_mr(rs->target_mr);
+ free(rs->target_buffer_list);
+ }
if (rs->cm_id) {
+ rs_free_iomappings(rs);
if (rs->cm_id->qp)
rdma_destroy_qp(rs->cm_id);
rdma_destroy_id(rs->cm_id);
}
+ fastlock_destroy(&rs->iomap_lock);
fastlock_destroy(&rs->cq_wait_lock);
fastlock_destroy(&rs->cq_lock);
fastlock_destroy(&rs->rlock);
@@ -492,9 +593,11 @@ static void rs_set_conn_data(struct rsocket *rs, struct
rdma_conn_param *param,
struct rs_conn_data *conn)
{
conn->version = 1;
- conn->flags = rs_host_is_net() ? RS_CONN_FLAG_NET : 0;
+ conn->flags = RS_CONN_FLAG_IOMAP |
+ (rs_host_is_net() ? RS_CONN_FLAG_NET : 0);
conn->credits = htons(rs->rq_size);
- conn->reserved2 = 0;
+ memset(conn->reserved, 0, sizeof conn->reserved);
+ conn->target_iomap_size = (uint8_t)
rs_value_to_scale(rs->target_iomap_size, 8);
conn->target_sgl.addr = htonll((uintptr_t) rs->target_sgl);
conn->target_sgl.length = htonl(RS_SGL_SIZE);
@@ -518,6 +621,13 @@ static void rs_save_conn_data(struct rsocket *rs, struct
rs_conn_data *conn)
(!rs_host_is_net() && (conn->flags & RS_CONN_FLAG_NET)))
rs->opts = RS_OPT_SWAP_SGL;
+ if (conn->flags & RS_CONN_FLAG_IOMAP) {
+ rs->remote_iomap.addr = rs->remote_sgl.addr +
+ sizeof(rs->remote_sgl) *
rs->remote_sgl.length;
+ rs->remote_iomap.length =
rs_scale_to_value(conn->target_iomap_size, 8);
+ rs->remote_iomap.key = rs->remote_sgl.key;
+ }
+
rs->target_sgl[0].addr = ntohll(conn->data_buf.addr);
rs->target_sgl[0].length = ntohl(conn->data_buf.length);
rs->target_sgl[0].key = ntohl(conn->data_buf.key);
@@ -753,7 +863,7 @@ int rconnect(int socket, const struct sockaddr *addr,
socklen_t addrlen)
return rs_do_connect(rs);
}
-static int rs_post_write(struct rsocket *rs,
+static int rs_post_write_msg(struct rsocket *rs,
struct ibv_sge *sgl, int nsge,
uint32_t imm_data, int flags,
uint64_t addr, uint32_t rkey)
@@ -773,6 +883,25 @@ static int rs_post_write(struct rsocket *rs,
return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
}
+static int rs_post_write(struct rsocket *rs,
+ struct ibv_sge *sgl, int nsge,
+ uint64_t wr_id, int flags,
+ uint64_t addr, uint32_t rkey)
+{
+ struct ibv_send_wr wr, *bad;
+
+ wr.wr_id = wr_id;
+ wr.next = NULL;
+ wr.sg_list = sgl;
+ wr.num_sge = nsge;
+ wr.opcode = IBV_WR_RDMA_WRITE;
+ wr.send_flags = flags;
+ wr.wr.rdma.remote_addr = addr;
+ wr.wr.rdma.rkey = rkey;
+
+ return rdma_seterrno(ibv_post_send(rs->cm_id->qp, &wr, &bad));
+}
+
/*
* Update target SGE before sending data. Otherwise the remote side may
* update the entry before we do.
@@ -799,8 +928,35 @@ static int rs_write_data(struct rsocket *rs,
rs->target_sge = 0;
}
- return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
- flags, addr, rkey);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_DATA, length),
+ flags, addr, rkey);
+}
+
+static int rs_write_direct(struct rsocket *rs, struct rs_iomap *iom, uint64_t
offset,
+ struct ibv_sge *sgl, int nsge, uint32_t length, int
flags)
+{
+ uint64_t addr;
+
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= length;
+
+ addr = iom->sge.addr + offset - iom->offset;
+ return rs_post_write(rs, sgl, nsge, rs_msg_set(RS_OP_WRITE, length),
+ flags, addr, iom->sge.key);
+}
+
+static int rs_write_iomap(struct rsocket *rs, struct rs_iomap_mr *iomr,
+ struct ibv_sge *sgl, int nsge, int flags)
+{
+ uint64_t addr;
+
+ rs->sseq_no++;
+ rs->sqe_avail--;
+ rs->sbuf_bytes_avail -= sizeof(struct rs_iomap);
+
+ addr = rs->remote_iomap.addr + iomr->index * sizeof(struct rs_iomap);
+ return rs_post_write_msg(rs, sgl, nsge, rs_msg_set(RS_OP_IOMAP_SGL,
iomr->index),
+ flags, addr, rs->remote_iomap.key);
}
static uint32_t rs_sbuf_left(struct rsocket *rs)
@@ -831,12 +987,12 @@ static void rs_send_credits(struct rsocket *rs)
ibsge.lkey = 0;
ibsge.length = sizeof(sge);
- rs_post_write(rs, &ibsge, 1,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
- IBV_SEND_INLINE,
- rs->remote_sgl.addr +
- rs->remote_sge * sizeof(struct rs_sge),
- rs->remote_sgl.key);
+ rs_post_write_msg(rs, &ibsge, 1,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no +
rs->rq_size),
+ IBV_SEND_INLINE,
+ rs->remote_sgl.addr +
+ rs->remote_sge * sizeof(struct rs_sge),
+ rs->remote_sgl.key);
rs->rbuf_bytes_avail -= rs->rbuf_size >> 1;
rs->rbuf_free_offset += rs->rbuf_size >> 1;
@@ -845,8 +1001,9 @@ static void rs_send_credits(struct rsocket *rs)
if (++rs->remote_sge == rs->remote_sgl.length)
rs->remote_sge = 0;
} else {
- rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_SGL, rs->rseq_no + rs->rq_size),
0, 0, 0);
+ rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_SGL, rs->rseq_no +
rs->rq_size),
+ 0, 0, 0);
}
}
@@ -880,6 +1037,9 @@ static int rs_poll_cq(struct rsocket *rs)
case RS_OP_SGL:
rs->sseq_comp = (uint16_t)
rs_msg_data(imm_data);
break;
+ case RS_OP_IOMAP_SGL:
+ /* The iomap was updated, that's nice to know.
*/
+ break;
case RS_OP_CTRL:
if (rs_msg_data(imm_data) ==
RS_CTRL_DISCONNECT) {
rs->state = rs_disconnected;
@@ -888,6 +1048,9 @@ static int rs_poll_cq(struct rsocket *rs)
rs->state &= ~rs_connect_rd;
}
break;
+ case RS_OP_WRITE:
+ /* We really shouldn't be here. */
+ break;
default:
rs->rmsg[rs->rmsg_tail].op =
rs_msg_op(imm_data);
rs->rmsg[rs->rmsg_tail].data =
rs_msg_data(imm_data);
@@ -905,6 +1068,10 @@ static int rs_poll_cq(struct rsocket *rs)
if (rs_msg_data((uint32_t) wc.wr_id) ==
RS_CTRL_DISCONNECT)
rs->state = rs_disconnected;
break;
+ case RS_OP_IOMAP_SGL:
+ rs->sqe_avail++;
+ rs->sbuf_bytes_avail += sizeof(struct rs_iomap);
+ break;
default:
rs->sqe_avail++;
rs->sbuf_bytes_avail += rs_msg_data((uint32_t)
wc.wr_id);
@@ -1046,7 +1213,7 @@ static int rs_poll_all(struct rsocket *rs)
*/
static int rs_can_send(struct rsocket *rs)
{
- return rs->sqe_avail && rs->sbuf_bytes_avail &&
+ return rs->sqe_avail && (rs->sbuf_bytes_avail >= RS_SNDLOWAT) &&
(rs->sseq_no != rs->sseq_comp) &&
(rs->target_sgl[rs->target_sge].length != 0);
}
@@ -1216,6 +1383,73 @@ ssize_t rreadv(int socket, const struct iovec *iov, int
iovcnt)
return rrecvv(socket, iov, iovcnt, 0);
}
+static int rs_send_iomaps(struct rsocket *rs, int flags)
+{
+ struct rs_iomap_mr *iomr;
+ struct ibv_sge sge;
+ struct rs_iomap iom;
+ int ret;
+
+ fastlock_acquire(&rs->iomap_lock);
+ while (!dlist_empty(&rs->iomap_queue)) {
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ iomr = container_of(rs->iomap_queue.next, struct rs_iomap_mr,
entry);
+ if (!(rs->opts & RS_OPT_SWAP_SGL)) {
+ iom.offset = iomr->offset;
+ iom.sge.addr = (uintptr_t) iomr->mr->addr;
+ iom.sge.length = iomr->mr->length;
+ iom.sge.key = iomr->mr->rkey;
+ } else {
+ iom.offset = bswap_64(iomr->offset);
+ iom.sge.addr = bswap_64((uintptr_t) iomr->mr->addr);
+ iom.sge.length = bswap_32(iomr->mr->length);
+ iom.sge.key = bswap_32(iomr->mr->rkey);
+ }
+
+ if (rs->sq_inline >= sizeof iom) {
+ sge.addr = (uintptr_t) &iom;
+ sge.length = sizeof iom;
+ sge.lkey = 0;
+ ret = rs_write_iomap(rs, iomr, &sge, 1,
IBV_SEND_INLINE);
+ } else if (rs_sbuf_left(rs) >= sizeof iom) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
sizeof iom);
+ rs->ssgl[0].length = sizeof iom;
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 1, 0);
+ if (rs_sbuf_left(rs) > sizeof iom)
+ rs->ssgl[0].addr += sizeof iom;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, &iom,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = sizeof iom - rs->ssgl[0].length;
+ memcpy(rs->sbuf, ((void *) &iom) + rs->ssgl[0].length,
+ rs->ssgl[1].length);
+ ret = rs_write_iomap(rs, iomr, rs->ssgl, 2, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf +
rs->ssgl[1].length;
+ }
+ dlist_remove(&iomr->entry);
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ if (ret)
+ break;
+ }
+
+ rs->iomap_pending = !dlist_empty(&rs->iomap_queue);
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
/*
* We overlap sending the data, by posting a small work request immediately,
* then increasing the size of the send on each iteration.
@@ -1224,7 +1458,7 @@ ssize_t rsend(int socket, const void *buf, size_t len,
int flags)
{
struct rsocket *rs;
struct ibv_sge sge;
- size_t left;
+ size_t left = len;
uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
int ret = 0;
@@ -1239,7 +1473,12 @@ ssize_t rsend(int socket, const void *buf, size_t len,
int flags)
}
fastlock_acquire(&rs->slock);
- for (left = len; left; left -= xfer_size, buf += xfer_size) {
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size, buf += xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
rs_conn_can_send);
@@ -1289,6 +1528,7 @@ ssize_t rsend(int socket, const void *buf, size_t len,
int flags)
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
@@ -1345,9 +1585,15 @@ static ssize_t rsendv(int socket, const struct iovec
*iov, int iovcnt, int flags
len = iov[0].iov_len;
for (i = 1; i < iovcnt; i++)
len += iov[i].iov_len;
+ left = len;
fastlock_acquire(&rs->slock);
- for (left = len; left; left -= xfer_size) {
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size) {
if (!rs_can_send(rs)) {
ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
rs_conn_can_send);
@@ -1395,6 +1641,7 @@ static ssize_t rsendv(int socket, const struct iovec
*iov, int iovcnt, int flags
if (ret)
break;
}
+out:
fastlock_release(&rs->slock);
return (ret && left == len) ? ret : len - left;
@@ -1725,8 +1972,8 @@ int rshutdown(int socket, int how)
if ((rs->state & rs_connected) && rs->ctrl_avail) {
rs->ctrl_avail--;
- ret = rs_post_write(rs, NULL, 0,
- rs_msg_set(RS_OP_CTRL, ctrl), 0, 0,
0);
+ ret = rs_post_write_msg(rs, NULL, 0,
+ rs_msg_set(RS_OP_CTRL, ctrl),
0, 0, 0);
}
}
@@ -1814,6 +2061,8 @@ int rsetsockopt(int socket, int level, int optname,
case SO_SNDBUF:
if (!rs->sbuf)
rs->sbuf_size = (*(uint32_t *) optval) << 1;
+ if (rs->sbuf_size < RS_SNDLOWAT)
+ rs->sbuf_size = RS_SNDLOWAT << 1;
ret = 0;
break;
case SO_LINGER:
@@ -1878,6 +2127,10 @@ int rsetsockopt(int socket, int level, int optname,
if (rs->sq_inline < RS_MIN_INLINE)
rs->sq_inline = RS_MIN_INLINE;
break;
+ case RDMA_IOMAPSIZE:
+ rs->target_iomap_size = (uint16_t) rs_scale_to_value(
+ (uint8_t) rs_value_to_scale(*(int *) optval,
8), 8);
+ break;
default:
break;
}
@@ -1979,6 +2232,10 @@ int rgetsockopt(int socket, int level, int optname,
*((int *) optval) = rs->sq_inline;
*optlen = sizeof(int);
break;
+ case RDMA_IOMAPSIZE:
+ *((int *) optval) = rs->target_iomap_size;
+ *optlen = sizeof(int);
+ break;
default:
ret = ENOTSUP;
break;
@@ -2020,3 +2277,201 @@ int rfcntl(int socket, int cmd, ... /* arg */ )
va_end(args);
return ret;
}
+
+static struct rs_iomap_mr *rs_get_iomap_mr(struct rsocket *rs)
+{
+ int i;
+
+ if (!rs->remote_iomappings) {
+ rs->remote_iomappings = calloc(rs->remote_iomap.length,
+ sizeof(*rs->remote_iomappings));
+ if (!rs->remote_iomappings)
+ return NULL;
+
+ for (i = 0; i < rs->remote_iomap.length; i++)
+ rs->remote_iomappings[i].index = i;
+ }
+
+ for (i = 0; i < rs->remote_iomap.length; i++) {
+ if (!rs->remote_iomappings[i].mr)
+ return &rs->remote_iomappings[i];
+ }
+ return NULL;
+}
+
+/*
+ * If an offset is given, we map to it. If offset is -1, then we map the
+ * offset to the address of buf. We do not check for conflicts, which must
+ * be fixed at some point.
+ */
+off_t riomap(int socket, void *buf, size_t len, int prot, int flags, off_t
offset)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ int access = IBV_ACCESS_LOCAL_WRITE;
+
+ rs = idm_at(&idm, socket);
+ if (!rs->cm_id->pd || (prot & ~(PROT_WRITE | PROT_NONE)))
+ return ERR(EINVAL);
+
+ fastlock_acquire(&rs->iomap_lock);
+ if (prot & PROT_WRITE) {
+ iomr = rs_get_iomap_mr(rs);
+ access |= IBV_ACCESS_REMOTE_WRITE;
+ } else {
+ iomr = calloc(1, sizeof *iomr);
+ iomr->index = -1;
+ }
+ if (!iomr) {
+ offset = ERR(ENOMEM);
+ goto out;
+ }
+
+ iomr->mr = ibv_reg_mr(rs->cm_id->pd, buf, len, access);
+ if (!iomr->mr) {
+ if (iomr->index < 0)
+ free(iomr);
+ offset = -1;
+ goto out;
+ }
+
+ if (offset == -1)
+ offset = (uintptr_t) buf;
+ iomr->offset = offset;
+ atomic_init(&iomr->refcnt);
+ atomic_set(&iomr->refcnt, 1);
+
+ if (iomr->index >= 0) {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_queue);
+ rs->iomap_pending = 1;
+ } else {
+ dlist_insert_tail(&iomr->entry, &rs->iomap_list);
+ }
+out:
+ fastlock_release(&rs->iomap_lock);
+ return offset;
+}
+
+int riounmap(int socket, void *buf, size_t len)
+{
+ struct rsocket *rs;
+ struct rs_iomap_mr *iomr;
+ dlist_entry *entry;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->iomap_lock);
+
+ for (entry = rs->iomap_list.next; entry != &rs->iomap_list;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+
+ for (entry = rs->iomap_queue.next; entry != &rs->iomap_queue;
+ entry = entry->next) {
+ iomr = container_of(entry, struct rs_iomap_mr, entry);
+ if (iomr->mr->addr == buf && iomr->mr->length == len) {
+ rs_release_iomap_mr(iomr);
+ goto out;
+ }
+ }
+ ret = ERR(EINVAL);
+out:
+ fastlock_release(&rs->iomap_lock);
+ return ret;
+}
+
+static struct rs_iomap *rs_find_iomap(struct rsocket *rs, off_t offset)
+{
+ int i;
+
+ for (i = 0; i < rs->target_iomap_size; i++) {
+ if (offset >= rs->target_iomap[i].offset &&
+ offset < rs->target_iomap[i].offset +
rs->target_iomap[i].sge.length)
+ return &rs->target_iomap[i];
+ }
+ return NULL;
+}
+
+size_t riowrite(int socket, const void *buf, size_t count, off_t offset, int
flags)
+{
+ struct rsocket *rs;
+ struct rs_iomap *iom = NULL;
+ struct ibv_sge sge;
+ size_t left = count;
+ uint32_t xfer_size, olen = RS_OLAP_START_SIZE;
+ int ret = 0;
+
+ rs = idm_at(&idm, socket);
+ fastlock_acquire(&rs->slock);
+ if (rs->iomap_pending) {
+ ret = rs_send_iomaps(rs, flags);
+ if (ret)
+ goto out;
+ }
+ for (; left; left -= xfer_size, buf += xfer_size, offset += xfer_size) {
+ if (!iom || offset > iom->offset + iom->sge.length) {
+ iom = rs_find_iomap(rs, offset);
+ if (!iom)
+ break;
+ }
+
+ if (!rs_can_send(rs)) {
+ ret = rs_get_comp(rs, rs_nonblocking(rs, flags),
+ rs_conn_can_send);
+ if (ret)
+ break;
+ if (!(rs->state & rs_connect_wr)) {
+ ret = ERR(ECONNRESET);
+ break;
+ }
+ }
+
+ if (olen < left) {
+ xfer_size = olen;
+ if (olen < RS_MAX_TRANSFER)
+ olen <<= 1;
+ } else {
+ xfer_size = left;
+ }
+
+ if (xfer_size > rs->sbuf_bytes_avail)
+ xfer_size = rs->sbuf_bytes_avail;
+ if (xfer_size > iom->offset + iom->sge.length - offset)
+ xfer_size = iom->offset + iom->sge.length - offset;
+
+ if (xfer_size <= rs->sq_inline) {
+ sge.addr = (uintptr_t) buf;
+ sge.length = xfer_size;
+ sge.lkey = 0;
+ ret = rs_write_direct(rs, iom, offset, &sge, 1,
+ xfer_size, IBV_SEND_INLINE);
+ } else if (xfer_size <= rs_sbuf_left(rs)) {
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
xfer_size);
+ rs->ssgl[0].length = xfer_size;
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 1,
xfer_size, 0);
+ if (xfer_size < rs_sbuf_left(rs))
+ rs->ssgl[0].addr += xfer_size;
+ else
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf;
+ } else {
+ rs->ssgl[0].length = rs_sbuf_left(rs);
+ memcpy((void *) (uintptr_t) rs->ssgl[0].addr, buf,
+ rs->ssgl[0].length);
+ rs->ssgl[1].length = xfer_size - rs->ssgl[0].length;
+ memcpy(rs->sbuf, buf + rs->ssgl[0].length,
rs->ssgl[1].length);
+ ret = rs_write_direct(rs, iom, offset, rs->ssgl, 2,
xfer_size, 0);
+ rs->ssgl[0].addr = (uintptr_t) rs->sbuf +
rs->ssgl[1].length;
+ }
+ if (ret)
+ break;
+ }
+out:
+ fastlock_release(&rs->slock);
+
+ return (ret && left == count) ? ret : count - left;
+}
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html