From: Liu Yuan <[email protected]>

This is inspired by the observation that each Guest can issue as much
as 4 requests in one go.

The complexity added to the code is seen outside of sockfd cache:
add one more parameter to the API: FD index. The underlying core needs this to
identify which FD belongs one node is actually used.

I think this trade-off is a good deal.

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/gateway.c      |   31 ++++---
 sheep/sheep_priv.h   |    6 +-
 sheep/sockfd_cache.c |  227 ++++++++++++++++++++++++++++++++++++++------------
 3 files changed, 197 insertions(+), 67 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index c3e5f80..f126dfb 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -54,11 +54,13 @@ read_remote:
        j = random();
        for (i = 0; i < nr_copies; i++) {
                int idx = (i + j) % nr_copies;
+               int sock_idx;
+
                v = obj_vnodes[idx];
                if (vnode_is_local(v))
                        continue;
 
-               fd = sheep_get_fd(v);
+               fd = sheep_get_fd(v, &sock_idx);
                if (fd < 0) {
                        ret = SD_RES_NETWORK_ERROR;
                        continue;
@@ -70,11 +72,11 @@ read_remote:
                ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
 
                if (ret) { /* network errors */
-                       sheep_del_fd(v, fd);
+                       sheep_del_fd(v, fd, sock_idx);
                        ret = SD_RES_NETWORK_ERROR;
                        continue;
                } else {
-                       sheep_put_fd(v, fd);
+                       sheep_put_fd(v, fd, sock_idx);
                        memcpy(&req->rp, rsp, sizeof(*rsp));
                        ret = rsp->result;
                        break;
@@ -86,6 +88,7 @@ read_remote:
 struct write_info {
        struct pollfd pfds[SD_MAX_REDUNDANCY];
        struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+       int sock_idx[SD_MAX_REDUNDANCY];
 };
 
 int forward_write_obj_req(struct request *req)
@@ -104,9 +107,10 @@ int forward_write_obj_req(struct request *req)
 
        dprintf("%"PRIx64"\n", oid);
 
-       memset(&wi, 0, sizeof(wi));
-       for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+       for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
                wi.pfds[i].fd = -1;
+               wi.vnodes[i] = NULL;
+       }
 
        memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
        fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -125,7 +129,7 @@ int forward_write_obj_req(struct request *req)
                        continue;
                }
 
-               fd = sheep_get_fd(v);
+               fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
                if (fd < 0) {
                        ret = SD_RES_NETWORK_ERROR;
                        goto err;
@@ -133,7 +137,7 @@ int forward_write_obj_req(struct request *req)
 
                ret = send_req(fd, &fwd_hdr, req->data, &wlen);
                if (ret) { /* network errors */
-                       sheep_del_fd(v, fd);
+                       sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
                        ret = SD_RES_NETWORK_ERROR;
                        dprintf("fail %"PRIu32"\n", ret);
                        goto err;
@@ -174,7 +178,8 @@ again:
                if (wi.pfds[i].revents & POLLERR ||
                    wi.pfds[i].revents & POLLHUP ||
                    wi.pfds[i].revents & POLLNVAL) {
-                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+                                    wi.sock_idx[i]);
                        ret = SD_RES_NETWORK_ERROR;
                        break;
                }
@@ -184,7 +189,8 @@ again:
 
                if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
                        eprintf("failed to read a response: %m\n");
-                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
+                                    wi.sock_idx[i]);
                        ret = SD_RES_NETWORK_ERROR;
                        break;
                }
@@ -193,8 +199,7 @@ again:
                        eprintf("fail %"PRIu32"\n", rsp->result);
                        ret = rsp->result;
                }
-
-               sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
+               sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
                break;
        }
        if (i < nr_fds) {
@@ -203,6 +208,8 @@ again:
                        sizeof(struct pollfd) * (nr_fds - i));
                memmove(wi.vnodes + i, wi.vnodes + i + 1,
                        sizeof(struct sd_vnode *) * (nr_fds - i));
+               memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
+                       sizeof(int) * (nr_fds - i));
        }
 
        dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -213,7 +220,7 @@ out:
        return ret;
 err:
        for (i = 0; i < nr_fds; i++)
-               sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
+               sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
        return ret;
 }
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 83a42e3..747599c 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -408,8 +408,8 @@ void sockfd_cache_del(struct node_id *);
 void sockfd_cache_add(struct sd_node *);
 void sockfd_cache_add_group(struct sd_node *nodes, int nr);
 
-int sheep_get_fd(struct sd_vnode *vnode);
-void sheep_put_fd(struct sd_vnode *vnode, int fd);
-void sheep_del_fd(struct sd_vnode *vnode, int fd);
+int sheep_get_fd(struct sd_vnode *vnode, int *);
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int);
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int);
 
 #endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
index 00095c2..61b5595 100644
--- a/sheep/sockfd_cache.c
+++ b/sheep/sockfd_cache.c
@@ -11,6 +11,20 @@
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 
+/*
+ * The sockfd cache provides us long TCP connections connected to the nodes
+ * in the cluster to accerlater the data transfer, which has the following
+ * characteristics:
+ *    0 dynamically allocated/deallocated at node granularity.
+ *    1 cached fds are multiplexed by all threads.
+ *    2 each session (for e.g, forward_write_obj_req) can grab one fd at a 
time.
+ *    3 if there isn't any FD available from cache, use normal connect_to() and
+ *      close() internally.
+ *    4 FD are named by IP:PORT uniquely, hence no need of resetting at
+ *      membership change.
+ *    5 the total number of FDs is scalable to massive nodes.
+ *    6 total 3 APIs: sheep_{get,put,del}_fd().
+ */
 #include <urcu/uatomic.h>
 #include <pthread.h>
 #include <stdint.h>
@@ -42,9 +56,19 @@ static struct sockfd_cache sockfd_cache = {
 
 struct sockfd_cache_entry {
        struct rb_node rb;
-       int fd;
-       uint8_t refcount;
        struct node_id nid;
+#define SOCKFD_CACHE_MAX_FD    8 /* How many FDs we cache for one node */
+       /*
+        * FIXME: Make this macro configurable.
+        *
+        * Suppose request size from Guest is 512k, then 4M / 512k = 8, so at
+        * most 8 requests can be issued to the same sheep object. Based on this
+        * assumption, '8' would be effecient for servers that only host 4~8
+        * Guests, but for powerful servers that can host dozens of Guests, we
+        * might consider bigger value.
+        */
+       int fd[SOCKFD_CACHE_MAX_FD];
+       uint8_t fd_in_use[SOCKFD_CACHE_MAX_FD];
 };
 
 static inline int node_id_cmp(const void *a, const void *b)
@@ -113,34 +137,105 @@ static struct sockfd_cache_entry 
*sockfd_cache_search(struct node_id *nid)
        return NULL;
 }
 
-static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+static inline int get_free_slot(struct sockfd_cache_entry *entry)
+{
+       int idx = -1, i;
+
+       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++) {
+               if (uatomic_cmpxchg(&entry->fd_in_use[i], 0, 1))
+                       continue;
+               idx = i;
+               break;
+       }
+       return idx;
+}
+
+/*
+ * Grab a free slot of the node and inc the refcount of the slot
+ *
+ * If no free slot available, this typically means we should use short FD.
+ */
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid,
+                                                   char *name, int *ret_idx)
 {
        struct sockfd_cache_entry *entry;
 
        pthread_rwlock_rdlock(&sockfd_cache.lock);
        entry = sockfd_cache_search(nid);
-       pthread_rwlock_unlock(&sockfd_cache.lock);
-       assert(entry);
-       /* if refcount == 0, set it to 1, otherwise someone holds it */
-       if (uatomic_cmpxchg(&entry->refcount, 0, 1))
-               return NULL;
+       if (!entry) {
+               dprintf("failed node %s:%d\n", name, nid->port);
+               goto out;
+       }
 
+       *ret_idx = get_free_slot(entry);
+       if (*ret_idx == -1)
+               entry = NULL;
+out:
+       pthread_rwlock_unlock(&sockfd_cache.lock);
        return entry;
 }
 
-void sockfd_cache_del(struct node_id *nid)
+static inline bool slots_all_free(struct sockfd_cache_entry *entry)
+{
+       int i;
+       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+               if (uatomic_read(&entry->fd_in_use[i]))
+                       return false;
+       return true;
+}
+
+static inline void destroy_all_slots(struct sockfd_cache_entry *entry)
+{
+       int i;
+       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+               if (entry->fd[i] != -1)
+                       close(entry->fd[i]);
+}
+
+/*
+ * Destroy all the Cached FDs of the node
+ *
+ * We don't proceed if some other node grab one FD of the node. In this case,
+ * the victim node will finally find itself talking to a dead node and call
+ * sheep_del_fd() to delete this node from the cache.
+ */
+static bool sockfd_cache_destroy(struct node_id *nid)
 {
        struct sockfd_cache_entry *entry;
+
+       pthread_rwlock_wrlock(&sockfd_cache.lock);
+       entry = sockfd_cache_search(nid);
+       if (!entry) {
+               dprintf("It is already destroyed\n");
+               goto false_out;
+       }
+
+       if (!slots_all_free(entry)) {
+               dprintf("Some victim still holds it\n");
+               goto false_out;
+       }
+
+       rb_erase(&entry->rb, &sockfd_cache.root);
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+
+       destroy_all_slots(entry);
+       free(entry);
+
+       return true;
+false_out:
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       return false;
+}
+
+/* When node craches, we should delete it from the cache */
+void sockfd_cache_del(struct node_id *nid)
+{
        char name[INET6_ADDRSTRLEN];
        int n;
 
-       entry = sockfd_cache_grab(nid);
-       /* Hmmm, some victim still holds it, he is supposed to delete it */
-       if (!entry)
+       if (!sockfd_cache_destroy(nid))
                return;
 
-       rb_erase(&entry->rb, &sockfd_cache.root);
-       free(entry);
        n = uatomic_sub_return(&sockfd_cache.count, 1);
        addr_to_str(name, sizeof(name), nid->addr, 0);
        dprintf("%s:%d, count %d\n", name, nid->port, n);
@@ -149,8 +244,11 @@ void sockfd_cache_del(struct node_id *nid)
 static void sockfd_cache_add_nolock(struct node_id *nid)
 {
        struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+       int i;
+
+       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+               new->fd[i] = -1;
 
-       new->fd = -1;
        memcpy(&new->nid, nid, sizeof(struct node_id));
        if (sockfd_cache_insert(new)) {
                free(new);
@@ -159,6 +257,7 @@ static void sockfd_cache_add_nolock(struct node_id *nid)
        sockfd_cache.count++;
 }
 
+/* Add group of nodes to the cache */
 void sockfd_cache_add_group(struct sd_node *nodes, int nr)
 {
        struct sd_node *p;
@@ -174,13 +273,16 @@ void sockfd_cache_add_group(struct sd_node *nodes, int nr)
        pthread_rwlock_unlock(&sockfd_cache.lock);
 }
 
+/* Add one node to the cache means we can do caching tricks on this node */
 void sockfd_cache_add(struct sd_node *node)
 {
        struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
        char name[INET6_ADDRSTRLEN];
-       int n;
+       int n, i;
+
+       for (i = 0; i < SOCKFD_CACHE_MAX_FD; i++)
+               new->fd[i] = -1;
 
-       new->fd = -1;
        memcpy(&new->nid, node, sizeof(struct node_id));
        pthread_rwlock_rdlock(&sockfd_cache.lock);
        if (sockfd_cache_insert(new)) {
@@ -194,58 +296,71 @@ void sockfd_cache_add(struct sd_node *node)
        dprintf("%s:%d, count %d\n", name, node->port, n);
 }
 
-static int sockfd_cache_get(struct node_id *nid)
+static int sockfd_cache_get(struct node_id *nid, char *name, int *ret_idx)
 {
        struct sockfd_cache_entry *entry;
-       char name[INET6_ADDRSTRLEN];
        int fd;
 
-       entry = sockfd_cache_grab(nid);
+       entry = sockfd_cache_grab(nid, name, ret_idx);
        if (!entry)
                return -1;
 
-       if (entry->fd != -1)
-               return entry->fd;
+       if (entry->fd[*ret_idx] != -1) {
+               dprintf("%s:%d, idx %d\n", name, nid->port, *ret_idx);
+               return entry->fd[*ret_idx];
+       }
 
-       /* Create a new connection for this vnode */
-       addr_to_str(name, sizeof(name), nid->addr, 0);
-       dprintf("create connection %s:%d\n", name, nid->port);
+       /* Create a new cached connection for this vnode */
+       dprintf("create connection %s:%d idx %d\n", name, nid->port, *ret_idx);
        fd = connect_to(name, nid->port);
        if (fd < 0) {
-               uatomic_dec(&entry->refcount);
+               uatomic_dec(&entry->fd_in_use[*ret_idx]);
                return -1;
        }
-       entry->fd = fd;
+       entry->fd[*ret_idx] = fd;
 
        return fd;
 }
 
-static void sockfd_cache_put(struct node_id *nid)
+static void sockfd_cache_put(struct node_id *nid, int idx)
 {
        struct sockfd_cache_entry *entry;
        char name[INET6_ADDRSTRLEN];
        int refcnt;
 
        addr_to_str(name, sizeof(name), nid->addr, 0);
-       dprintf("%s:%d\n", name, nid->port);
+       dprintf("%s:%d idx %d\n", name, nid->port, idx);
+
        pthread_rwlock_rdlock(&sockfd_cache.lock);
        entry = sockfd_cache_search(nid);
        pthread_rwlock_unlock(&sockfd_cache.lock);
+
        assert(entry);
-       refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+       refcnt = uatomic_cmpxchg(&entry->fd_in_use[idx], 1, 0);
        assert(refcnt == 1);
 }
 
-int sheep_get_fd(struct sd_vnode *vnode)
+/*
+ * Return a FD connected to the vnode to the caller
+ *
+ * Try to get a 'long' FD as best, which is cached and never closed. If no FD
+ * available, we return a 'short' FD which is supposed to be closed by
+ * sheep_get_put().
+ *
+ * ret_idx is opaque to the caller, -1 indicates it is a short FD.
+ */
+int sheep_get_fd(struct sd_vnode *vnode, int *ret_idx)
 {
        struct node_id *nid = (struct node_id *)vnode;
        char name[INET6_ADDRSTRLEN];
-       int fd = sockfd_cache_get(nid);
+       int fd;
 
+       addr_to_str(name, sizeof(name), nid->addr, 0);
+       fd = sockfd_cache_get(nid, name, ret_idx);
        if (fd != -1)
                return fd;
 
-       addr_to_str(name, sizeof(name), nid->addr, 0);
+       /* Create a fd that is to be closed */
        fd = connect_to(name, nid->port);
        if (fd < 0) {
                dprintf("failed connect to %s:%d\n", name, nid->port);
@@ -256,37 +371,45 @@ int sheep_get_fd(struct sd_vnode *vnode)
        return fd;
 }
 
-void sheep_put_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Rlease a FD connected to the vnode, which is acquired from sheep_get_fd()
+ *
+ * If it is a long FD, just decrease the refcount to make it available again.
+ * If it is a short FD, close it.
+ *
+ * sheep_put_fd() or sheep_del_fd() should be paired with sheep_get_fd()
+ */
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd, int idx)
 {
        struct node_id *nid = (struct node_id *)vnode;
-       struct sockfd_cache_entry *entry;
 
-       pthread_rwlock_rdlock(&sockfd_cache.lock);
-       entry = sockfd_cache_search(nid);
-       pthread_rwlock_unlock(&sockfd_cache.lock);
-       assert(entry);
-       if (entry->fd == fd) {
-               sockfd_cache_put(nid);
-       } else {
+       if (idx == -1) {
                dprintf("%d\n", fd);
                close(fd);
+               return;
        }
+
+       sockfd_cache_put(nid, idx);
 }
 
-void sheep_del_fd(struct sd_vnode *vnode, int fd)
+/*
+ * Delete a FD connected to the vnode, when vnode is crashed.
+ *
+ * If it is a long FD, de-refcount it and tres to destroy all the cached FDs of
+ * this vnode in the cache.
+ * If it is a short FD, just close it.
+ */
+void sheep_del_fd(struct sd_vnode *vnode, int fd, int idx)
 {
        struct node_id *nid = (struct node_id *)vnode;
-       struct sockfd_cache_entry *entry;
 
-       pthread_rwlock_rdlock(&sockfd_cache.lock);
-       entry = sockfd_cache_search(nid);
-       pthread_rwlock_unlock(&sockfd_cache.lock);
-       assert(entry);
-       if (entry->fd == fd) {
-               sockfd_cache_put(nid);
-               sockfd_cache_del(nid);
-       } else {
+       if (idx == -1) {
                dprintf("%d\n", fd);
                close(fd);
+               return;
        }
+
+       sockfd_cache_put(nid, idx);
+       sockfd_cache_del(nid);
 }
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to