From: Liu Yuan <[email protected]>

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/gateway.c |   55 ++++++++++++++++++++++++++++++++-----------------------
 sheep/group.c   |    9 ++++++++-
 2 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index 42f028a..c3e5f80 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -58,7 +58,7 @@ read_remote:
                if (vnode_is_local(v))
                        continue;
 
-               fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+               fd = sheep_get_fd(v);
                if (fd < 0) {
                        ret = SD_RES_NETWORK_ERROR;
                        continue;
@@ -70,10 +70,11 @@ read_remote:
                ret = exec_req(fd, &fwd_hdr, req->data, &wlen, &rlen);
 
                if (ret) { /* network errors */
-                       del_sheep_fd(fd);
+                       sheep_del_fd(v, fd);
                        ret = SD_RES_NETWORK_ERROR;
                        continue;
                } else {
+                       sheep_put_fd(v, fd);
                        memcpy(&req->rp, rsp, sizeof(*rsp));
                        ret = rsp->result;
                        break;
@@ -82,6 +83,11 @@ read_remote:
        return ret;
 }
 
+struct write_info {
+       struct pollfd pfds[SD_MAX_REDUNDANCY];
+       struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
+};
+
 int forward_write_obj_req(struct request *req)
 {
        int i, fd, ret, pollret;
@@ -93,15 +99,14 @@ int forward_write_obj_req(struct request *req)
        struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
        uint64_t oid = req->rq.obj.oid;
        int nr_copies;
-       struct pollfd pfds[SD_MAX_REDUNDANCY];
-       int nr_fds, local = 0;
+       int nr_fds = 0, local = 0;
+       struct write_info wi;
 
        dprintf("%"PRIx64"\n", oid);
 
-       nr_fds = 0;
-       memset(pfds, 0, sizeof(pfds));
-       for (i = 0; i < ARRAY_SIZE(pfds); i++)
-               pfds[i].fd = -1;
+       memset(&wi, 0, sizeof(wi));
+       for (i = 0; i < SD_MAX_REDUNDANCY; i++)
+               wi.pfds[i].fd = -1;
 
        memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
        fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
@@ -120,24 +125,23 @@ int forward_write_obj_req(struct request *req)
                        continue;
                }
 
-               fd = get_sheep_fd(v->addr, v->port, v->node_idx, fwd_hdr.epoch);
+               fd = sheep_get_fd(v);
                if (fd < 0) {
-                       eprintf("failed to connect to %s:%"PRIu32"\n", name,
-                               v->port);
                        ret = SD_RES_NETWORK_ERROR;
                        goto err;
                }
 
                ret = send_req(fd, &fwd_hdr, req->data, &wlen);
                if (ret) { /* network errors */
-                       del_sheep_fd(fd);
+                       sheep_del_fd(v, fd);
                        ret = SD_RES_NETWORK_ERROR;
                        dprintf("fail %"PRIu32"\n", ret);
                        goto err;
                }
 
-               pfds[nr_fds].fd = fd;
-               pfds[nr_fds].events = POLLIN;
+               wi.vnodes[nr_fds] = v;
+               wi.pfds[nr_fds].fd = fd;
+               wi.pfds[nr_fds].events = POLLIN;
                nr_fds++;
        }
 
@@ -157,7 +161,7 @@ int forward_write_obj_req(struct request *req)
 
        ret = SD_RES_SUCCESS;
 again:
-       pollret = poll(pfds, nr_fds, -1);
+       pollret = poll(wi.pfds, nr_fds, -1);
        if (pollret < 0) {
                if (errno == EINTR)
                        goto again;
@@ -167,19 +171,20 @@ again:
        }
 
        for (i = 0; i < nr_fds; i++) {
-               if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP ||
-                   pfds[i].revents & POLLNVAL) {
-                       del_sheep_fd(pfds[i].fd);
+               if (wi.pfds[i].revents & POLLERR ||
+                   wi.pfds[i].revents & POLLHUP ||
+                   wi.pfds[i].revents & POLLNVAL) {
+                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
                        ret = SD_RES_NETWORK_ERROR;
                        break;
                }
 
-               if (!(pfds[i].revents & POLLIN))
+               if (!(wi.pfds[i].revents & POLLIN))
                        continue;
 
-               if (do_read(pfds[i].fd, rsp, sizeof(*rsp))) {
+               if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
                        eprintf("failed to read a response: %m\n");
-                       del_sheep_fd(pfds[i].fd);
+                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
                        ret = SD_RES_NETWORK_ERROR;
                        break;
                }
@@ -189,11 +194,15 @@ again:
                        ret = rsp->result;
                }
 
+               sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd);
                break;
        }
        if (i < nr_fds) {
                nr_fds--;
-               memmove(pfds + i, pfds + i + 1, sizeof(*pfds) * (nr_fds - i));
+               memmove(wi.pfds + i, wi.pfds + i + 1,
+                       sizeof(struct pollfd) * (nr_fds - i));
+               memmove(wi.vnodes + i, wi.vnodes + i + 1,
+                       sizeof(struct sd_vnode *) * (nr_fds - i));
        }
 
        dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
@@ -204,7 +213,7 @@ out:
        return ret;
 err:
        for (i = 0; i < nr_fds; i++)
-               del_sheep_fd(pfds[i].fd);
+               sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd);
        return ret;
 }
 
diff --git a/sheep/group.c b/sheep/group.c
index b448809..19d5f40 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -803,10 +803,13 @@ static void finish_join(struct join_message *msg, struct 
sd_node *joined,
                if (sd_store->purge_obj &&
                    sd_store->purge_obj() != SD_RES_SUCCESS)
                        eprintf("WARN: may have stale objects\n");
+
+       sockfd_cache_add_group(nodes, nr_nodes);
 }
 
 static void update_cluster_info(struct join_message *msg,
-               struct sd_node *joined, struct sd_node *nodes, size_t nr_nodes)
+                               struct sd_node *joined, struct sd_node *nodes,
+                               size_t nr_nodes)
 {
        struct vnode_info *old_vnode_info;
 
@@ -867,6 +870,8 @@ static void update_cluster_info(struct join_message *msg,
                if (current_vnode_info->nr_zones >= sys->nr_copies)
                        sys_stat_set(SD_STATUS_OK);
        }
+
+       sockfd_cache_add(joined);
 }
 
 /*
@@ -1110,6 +1115,8 @@ void sd_leave_handler(struct sd_node *left, struct 
sd_node *members,
                if (current_vnode_info->nr_zones < sys->nr_copies)
                        sys_stat_set(SD_STATUS_HALT);
        }
+
+       sockfd_cache_del((struct node_id *)left);
 }
 
 int create_cluster(int port, int64_t zone, int nr_vnodes,
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to