From: Liu Yuan <[email protected]>

Old sockfd pool has following defect:
 0 statically allocated.
 1 use too many fds per sheep, not scalable
 2 implemented per thread, can't be shared between threads
 3 need resetting at every membership change

The new sockfd cache aims to address these problems yet remain as effecient as
old one:
 0 dynamically allocated/deallocated at node granularity.
 1 cached fds are multiplexed by all threads.
 2 each session (for e.g, forward_write_obj_req) can grab one fd at a time
 3 if there isn't any FD available from cache, use normal connect_to() and 
close()
   internally
 4 FD are named by IP:PORT uniquely, hence no need of resetting at membership
   change
 5 the total number of FDs shrinks from (nr_gateway + nr_io) * nr_nodes to 
nr_nodes
 6 just add one more API, totally 3 APIs: sheep_{get,put,del}_fd()

Signed-off-by: Liu Yuan <[email protected]>
---
 include/sheep.h      |    2 +-
 sheep/Makefile.am    |    2 +-
 sheep/sdnet.c        |   66 +-----------
 sheep/sheep_priv.h   |   14 ++-
 sheep/sockfd_cache.c |  292 ++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 306 insertions(+), 70 deletions(-)
 create mode 100644 sheep/sockfd_cache.c

diff --git a/include/sheep.h b/include/sheep.h
index ac9179c..7fbc26d 100644
--- a/include/sheep.h
+++ b/include/sheep.h
@@ -155,11 +155,11 @@ struct sd_node {
 };
 
 struct sd_vnode {
-       uint64_t        id;
        uint8_t         addr[16];
        uint16_t        port;
        uint16_t        node_idx;
        uint32_t        zone;
+       uint64_t        id;
 };
 
 struct epoch_log {
diff --git a/sheep/Makefile.am b/sheep/Makefile.am
index 1cb2ebf..0a874c6 100644
--- a/sheep/Makefile.am
+++ b/sheep/Makefile.am
@@ -26,7 +26,7 @@ sbin_PROGRAMS         = sheep
 
 sheep_SOURCES          = sheep.c group.c sdnet.c gateway.c store.c vdi.c 
work.c \
                          journal.c ops.c recovery.c cluster/local.c \
-                         object_cache.c object_list_cache.c
+                         object_cache.c object_list_cache.c sockfd_cache.c
 
 if BUILD_COROSYNC
 sheep_SOURCES          += cluster/corosync.c
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 7178e9e..eb8d0a1 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -19,6 +19,7 @@
 #include <fcntl.h>
 
 #include "sheep_priv.h"
+#include "rbtree.h"
 
 static void requeue_request(struct request *req);
 
@@ -748,68 +749,3 @@ int create_listen_port(int port, void *data)
        return create_listen_ports(port, create_listen_port_fn, data);
 }
 
-static __thread int cached_fds[SD_MAX_NODES];
-static __thread uint32_t cached_epoch = 0;
-
-void del_sheep_fd(int fd)
-{
-       int i;
-
-       for (i = 0; i < SD_MAX_NODES; i++) {
-               if (cached_fds[i] == fd) {
-                       if (fd >= 0)
-                               close(fd);
-
-                       cached_fds[i] = -1;
-
-                       return;
-               }
-       }
-}
-
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
-{
-       int i, fd;
-       char name[INET6_ADDRSTRLEN];
-
-       if (cached_epoch == 0) {
-               /* initialize */
-               for (i = 0; i < SD_MAX_NODES; i++)
-                       cached_fds[i] = -1;
-
-               cached_epoch = epoch;
-       }
-
-       if (before(epoch, cached_epoch)) {
-               eprintf("requested epoch is smaller than the previous one: %d < 
%d\n",
-                       epoch, cached_epoch);
-               return -1;
-       }
-       if (after(epoch, cached_epoch)) {
-               for (i = 0; i < SD_MAX_NODES; i++) {
-                       if (cached_fds[i] >= 0)
-                               close(cached_fds[i]);
-
-                       cached_fds[i] = -1;
-               }
-               cached_epoch = epoch;
-       }
-
-       fd = cached_fds[node_idx];
-       dprintf("%d, %d\n", epoch, fd);
-
-       if (cached_epoch == epoch && fd >= 0) {
-               dprintf("using the cached fd %d\n", fd);
-               return fd;
-       }
-
-       addr_to_str(name, sizeof(name), addr, 0);
-
-       fd = connect_to(name, port);
-       if (fd < 0)
-               return -1;
-
-       cached_fds[node_idx] = fd;
-
-       return fd;
-}
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afdaad8..83a42e3 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -292,9 +292,6 @@ int read_object(struct vnode_info *vnodes, uint32_t 
node_version,
 int remove_object(struct vnode_info *vnodes, uint32_t node_version,
                  uint64_t oid, int nr);
 
-void del_sheep_fd(int fd);
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
-
 int prealloc(int fd, uint32_t size);
 
 int objlist_cache_insert(uint64_t oid);
@@ -404,4 +401,15 @@ void object_cache_delete(uint32_t vid);
 
 int object_cache_init(const char *p);
 
+/* sockfd_cache */
+struct node_id;
+
+void sockfd_cache_del(struct node_id *);
+void sockfd_cache_add(struct sd_node *);
+void sockfd_cache_add_group(struct sd_node *nodes, int nr);
+
+int sheep_get_fd(struct sd_vnode *vnode);
+void sheep_put_fd(struct sd_vnode *vnode, int fd);
+void sheep_del_fd(struct sd_vnode *vnode, int fd);
+
 #endif
diff --git a/sheep/sockfd_cache.c b/sheep/sockfd_cache.c
new file mode 100644
index 0000000..00095c2
--- /dev/null
+++ b/sheep/sockfd_cache.c
@@ -0,0 +1,292 @@
+/*
+ * Copyright (C) 2012 Taobao Inc.
+ *
+ * Liu Yuan <[email protected]>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <urcu/uatomic.h>
+#include <pthread.h>
+#include <stdint.h>
+#include <stdlib.h>
+#include <stdio.h>
+
+#include "sheep.h"
+#include "sheep_priv.h"
+#include "list.h"
+#include "rbtree.h"
+#include "logger.h"
+#include "util.h"
+
+struct node_id {
+       uint8_t addr[16];
+       uint16_t port;
+};
+
+struct sockfd_cache {
+       struct rb_root root;
+       pthread_rwlock_t lock;
+       int count;
+};
+
+static struct sockfd_cache sockfd_cache = {
+       .root = RB_ROOT,
+       .lock = PTHREAD_RWLOCK_INITIALIZER,
+};
+
+struct sockfd_cache_entry {
+       struct rb_node rb;
+       int fd;
+       uint8_t refcount;
+       struct node_id nid;
+};
+
+static inline int node_id_cmp(const void *a, const void *b)
+{
+       const struct node_id *node1 = a;
+       const struct node_id *node2 = b;
+       int cmp;
+
+       cmp = memcmp(node1->addr, node2->addr, sizeof(node1->addr));
+       if (cmp != 0)
+               return cmp;
+
+       if (node1->port < node2->port)
+               return -1;
+       if (node1->port > node2->port)
+               return 1;
+       return 0;
+}
+
+static struct sockfd_cache_entry *
+sockfd_cache_insert(struct sockfd_cache_entry *new)
+{
+       struct rb_node **p = &sockfd_cache.root.rb_node;
+       struct rb_node *parent = NULL;
+       struct sockfd_cache_entry *entry;
+
+       while (*p) {
+               int cmp;
+
+               parent = *p;
+               entry = rb_entry(parent, struct sockfd_cache_entry, rb);
+               cmp = node_id_cmp(&new->nid, &entry->nid);
+
+               if (cmp < 0)
+                       p = &(*p)->rb_left;
+               else if (cmp > 0)
+                       p = &(*p)->rb_right;
+               else
+                       return entry;
+       }
+       rb_link_node(&new->rb, parent, p);
+       rb_insert_color(&new->rb, &sockfd_cache.root);
+
+       return NULL; /* insert successfully */
+}
+
+static struct sockfd_cache_entry *sockfd_cache_search(struct node_id *nid)
+{
+       struct rb_node *n = sockfd_cache.root.rb_node;
+       struct sockfd_cache_entry *t;
+
+       while (n) {
+               int cmp;
+
+               t = rb_entry(n, struct sockfd_cache_entry, rb);
+               cmp = node_id_cmp(nid, &t->nid);
+
+               if (cmp < 0)
+                       n = n->rb_left;
+               else if (cmp > 0)
+                       n = n->rb_right;
+               else
+                       return t; /* found it */
+       }
+
+       return NULL;
+}
+
+static struct sockfd_cache_entry *sockfd_cache_grab(struct node_id *nid)
+{
+       struct sockfd_cache_entry *entry;
+
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       entry = sockfd_cache_search(nid);
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       assert(entry);
+       /* if refcount == 0, set it to 1, otherwise someone holds it */
+       if (uatomic_cmpxchg(&entry->refcount, 0, 1))
+               return NULL;
+
+       return entry;
+}
+
+void sockfd_cache_del(struct node_id *nid)
+{
+       struct sockfd_cache_entry *entry;
+       char name[INET6_ADDRSTRLEN];
+       int n;
+
+       entry = sockfd_cache_grab(nid);
+       /* Hmmm, some victim still holds it, he is supposed to delete it */
+       if (!entry)
+               return;
+
+       rb_erase(&entry->rb, &sockfd_cache.root);
+       free(entry);
+       n = uatomic_sub_return(&sockfd_cache.count, 1);
+       addr_to_str(name, sizeof(name), nid->addr, 0);
+       dprintf("%s:%d, count %d\n", name, nid->port, n);
+}
+
+static void sockfd_cache_add_nolock(struct node_id *nid)
+{
+       struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+
+       new->fd = -1;
+       memcpy(&new->nid, nid, sizeof(struct node_id));
+       if (sockfd_cache_insert(new)) {
+               free(new);
+               return;
+       }
+       sockfd_cache.count++;
+}
+
+void sockfd_cache_add_group(struct sd_node *nodes, int nr)
+{
+       struct sd_node *p;
+       struct node_id *nid;
+
+       dprintf("%d\n", nr);
+       pthread_rwlock_wrlock(&sockfd_cache.lock);
+       while (nr--) {
+               p = nodes + nr;
+               nid = (struct node_id *)p;
+               sockfd_cache_add_nolock(nid);
+       }
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+}
+
+void sockfd_cache_add(struct sd_node *node)
+{
+       struct sockfd_cache_entry *new = xzalloc(sizeof(*new));
+       char name[INET6_ADDRSTRLEN];
+       int n;
+
+       new->fd = -1;
+       memcpy(&new->nid, node, sizeof(struct node_id));
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       if (sockfd_cache_insert(new)) {
+               free(new);
+               pthread_rwlock_unlock(&sockfd_cache.lock);
+               return;
+       }
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       n = uatomic_add_return(&sockfd_cache.count, 1);
+       addr_to_str(name, sizeof(name), node->addr, 0);
+       dprintf("%s:%d, count %d\n", name, node->port, n);
+}
+
+static int sockfd_cache_get(struct node_id *nid)
+{
+       struct sockfd_cache_entry *entry;
+       char name[INET6_ADDRSTRLEN];
+       int fd;
+
+       entry = sockfd_cache_grab(nid);
+       if (!entry)
+               return -1;
+
+       if (entry->fd != -1)
+               return entry->fd;
+
+       /* Create a new connection for this vnode */
+       addr_to_str(name, sizeof(name), nid->addr, 0);
+       dprintf("create connection %s:%d\n", name, nid->port);
+       fd = connect_to(name, nid->port);
+       if (fd < 0) {
+               uatomic_dec(&entry->refcount);
+               return -1;
+       }
+       entry->fd = fd;
+
+       return fd;
+}
+
+static void sockfd_cache_put(struct node_id *nid)
+{
+       struct sockfd_cache_entry *entry;
+       char name[INET6_ADDRSTRLEN];
+       int refcnt;
+
+       addr_to_str(name, sizeof(name), nid->addr, 0);
+       dprintf("%s:%d\n", name, nid->port);
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       entry = sockfd_cache_search(nid);
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       assert(entry);
+       refcnt = uatomic_cmpxchg(&entry->refcount, 1, 0);
+       assert(refcnt == 1);
+}
+
+int sheep_get_fd(struct sd_vnode *vnode)
+{
+       struct node_id *nid = (struct node_id *)vnode;
+       char name[INET6_ADDRSTRLEN];
+       int fd = sockfd_cache_get(nid);
+
+       if (fd != -1)
+               return fd;
+
+       addr_to_str(name, sizeof(name), nid->addr, 0);
+       fd = connect_to(name, nid->port);
+       if (fd < 0) {
+               dprintf("failed connect to %s:%d\n", name, nid->port);
+               return -1;
+       }
+
+       dprintf("%d\n", fd);
+       return fd;
+}
+
+void sheep_put_fd(struct sd_vnode *vnode, int fd)
+{
+       struct node_id *nid = (struct node_id *)vnode;
+       struct sockfd_cache_entry *entry;
+
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       entry = sockfd_cache_search(nid);
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       assert(entry);
+       if (entry->fd == fd) {
+               sockfd_cache_put(nid);
+       } else {
+               dprintf("%d\n", fd);
+               close(fd);
+       }
+}
+
+void sheep_del_fd(struct sd_vnode *vnode, int fd)
+{
+       struct node_id *nid = (struct node_id *)vnode;
+       struct sockfd_cache_entry *entry;
+
+       pthread_rwlock_rdlock(&sockfd_cache.lock);
+       entry = sockfd_cache_search(nid);
+       pthread_rwlock_unlock(&sockfd_cache.lock);
+       assert(entry);
+       if (entry->fd == fd) {
+               sockfd_cache_put(nid);
+               sockfd_cache_del(nid);
+       } else {
+               dprintf("%d\n", fd);
+               close(fd);
+       }
+}
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to