This patch uses poll when collie forward write requests to multiple
nodes.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 collie/store.c           |  211 ++++++++++++++++++++++++++++++++++++----------
 include/sheepdog_proto.h |    1 +
 2 files changed, 169 insertions(+), 43 deletions(-)

diff --git a/collie/store.c b/collie/store.c
index 72c079c..0fa711e 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -15,6 +15,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <unistd.h>
+#include <poll.h>
 #include <sys/xattr.h>
 #include <sys/statvfs.h>
 
@@ -323,21 +324,90 @@ static int read_from_other_sheeps(uint64_t oid, char 
*buf, int copies)
 
 static int store_queue_request_local(struct request *req, char *buf, uint32_t 
epoch);
 
-static int forward_obj_req(struct request *req, char *buf)
+static int forward_read_obj_req(struct request *req, char *buf)
 {
        int i, n, nr, fd, ret;
        unsigned wlen, rlen;
        char name[128];
        struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+       struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)hdr;
        struct sheepdog_node_list_entry *e;
-       struct sd_obj_req hdr2;
-       struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
        uint64_t oid = hdr->oid;
        int copies;
-       uint32_t epoch;
 
        e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
-again:
+       if (!e)
+               return SD_RES_NO_MEM;
+
+       nr = get_ordered_sd_node_list(e);
+
+       copies = hdr->copies;
+
+       /* temporary hack */
+       if (!copies)
+               copies = sys->nr_sobjs;
+
+       hdr->flags |= SD_FLAG_CMD_FORWARD;
+       hdr->epoch = sys->epoch;
+
+       /* TODO: we can do better; we need to check this first */
+       for (i = 0; i < copies; i++) {
+               n = obj_to_sheep(e, nr, oid, i);
+
+               if (is_myself(&e[n])) {
+                       ret = store_queue_request_local(req, buf, sys->epoch);
+                       goto out;
+               }
+       }
+
+       n = obj_to_sheep(e, nr, oid, 0);
+
+       addr_to_str(name, sizeof(name), e[n].addr, 0);
+
+       fd = connect_to(name, e[n].port);
+       if (fd < 0) {
+               ret = SD_RES_EIO;
+               goto out;
+       }
+
+       wlen = 0;
+       rlen = hdr->data_length;
+
+       ret = exec_req(fd, (struct sd_req *)hdr, req->data, &wlen, &rlen);
+
+       close(fd);
+
+       if (ret) /* network errors */
+               ret = SD_RES_EIO;
+       else {
+               memcpy(&req->rp, rsp, sizeof(*rsp));
+               ret = rsp->result;
+       }
+
+out:
+       free(e);
+
+       return ret;
+}
+
+static int forward_write_obj_req(struct request *req, char *buf)
+{
+       int i, n, nr, fd, ret;
+       unsigned wlen, rlen;
+       char name[128];
+       struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
+       struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+       struct sheepdog_node_list_entry *e;
+       uint64_t oid = hdr->oid;
+       int copies;
+       struct pollfd pfds[SD_MAX_REDUNDANCY];
+       int done, nr_fds, local = 0;
+
+       dprintf("%lx\n", oid);
+       e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
+       if (!e)
+               return SD_RES_NO_MEM;
+
        nr = get_ordered_sd_node_list(e);
 
        copies = hdr->copies;
@@ -346,65 +416,117 @@ again:
        if (!copies)
                copies = sys->nr_sobjs;
 
+       nr_fds = 0;
+       done = 0;
+       memset(pfds, 0, sizeof(pfds));
+       for (i = 0; i < ARRAY_SIZE(pfds); i++)
+               pfds[i].fd = -1;
+
+       hdr->flags |= SD_FLAG_CMD_FORWARD;
+       hdr->epoch = sys->epoch;
+
+       wlen = hdr->data_length;
+       rlen = 0;
+
        for (i = 0; i < copies; i++) {
                n = obj_to_sheep(e, nr, oid, i);
 
                addr_to_str(name, sizeof(name), e[n].addr, 0);
 
-               /* TODO: we can do better; we need to chech this first */
                if (is_myself(&e[n])) {
-                       if (hdr->flags & SD_FLAG_CMD_RECOVERY)
-                               epoch = hdr->tgt_epoch;
-                       else
-                               epoch = sys->epoch;
-                       ret = store_queue_request_local(req, buf, epoch);
-                       memcpy(rsp, &req->rp, sizeof(*rsp));
-                       rsp->result = ret;
-                       goto done;
+                       local = 1;
+                       continue;
                }
 
                fd = connect_to(name, e[n].port);
-               if (fd < 0)
-                       goto again;
-
-               memcpy(&hdr2, hdr, sizeof(hdr2));
+               if (fd < 0) {
+                       eprintf("failed to connect to %s:%d\n", name, 
e[n].port);
+                       ret = SD_RES_EIO;
+                       goto out;
+               }
 
-               if (hdr->flags & SD_FLAG_CMD_WRITE) {
-                       wlen = hdr->data_length;
-                       rlen = 0;
-               } else {
-                       wlen = 0;
-                       rlen = hdr->data_length;
+               ret = send_req(fd, (struct sd_req *)hdr, req->data, &wlen);
+               if (ret) { /* network errors */
+                       ret = SD_RES_EIO;
+                       dprintf("fail %d\n", ret);
+                       goto out;
                }
 
-               hdr2.flags |= SD_FLAG_CMD_FORWARD;
-               hdr2.epoch = sys->epoch;
+               pfds[nr_fds].fd = fd;
+               pfds[nr_fds].events = POLLIN;
+               nr_fds++;
+       }
 
-               ret = exec_req(fd, (struct sd_req *)&hdr2, req->data, &wlen, 
&rlen);
+       if (local) {
+               ret = store_queue_request_local(req, buf, sys->epoch);
+               rsp->result = ret;
 
-               close(fd);
+               if (nr_fds == 0) {
+                       eprintf("exit %d\n", ret);
+                       goto out;
+               }
+
+               if (rsp->result != SD_RES_SUCCESS) {
+                       eprintf("fail %d\n", ret);
+                       goto out;
+               }
+       }
 
-               if (ret) /* network errors */
+again:
+       ret = poll(pfds, nr_fds, -1);
+
+       if (ret < 0) {
+               if (errno == EINTR)
                        goto again;
 
-       done:
-               if (hdr->flags & SD_FLAG_CMD_WRITE) {
-                       if (rsp->result != SD_RES_SUCCESS) {
-                               free(e);
-                               return rsp->result;
-                       }
-               } else {
-                       if (rsp->result == SD_RES_SUCCESS) {
-                               memcpy(&req->rp, rsp, sizeof(req->rp));
-                               free(e);
-                               return SD_RES_SUCCESS;
-                       }
+               ret = SD_RES_EIO;
+               goto out;
+       }
+
+       for (i = 0; i < nr_fds; i++) {
+               if (pfds[i].fd < 0)
+                       continue;
+
+               if (pfds[i].revents & POLLERR || pfds[i].revents & POLLHUP) {
+                       ret = SD_RES_EIO;
+                       goto out;
                }
+
+               if (!(pfds[i].revents & POLLIN))
+                       continue;
+
+               ret = do_read(pfds[i].fd, rsp, sizeof(*rsp));
+
+               if (ret) {
+                       eprintf("failed to get a rsp, %m\n");
+                       ret = SD_RES_EIO;
+                       goto out;
+               }
+
+               if (rsp->result != SD_RES_SUCCESS) {
+                       eprintf("fail %d\n", rsp->result);
+                       ret = rsp->result;
+                       goto out;
+               }
+
+               done++;
        }
 
+       dprintf("%lx %d %d\n", oid, nr_fds, done);
+
+       if (done != nr_fds)
+               goto again;
+
+       ret = SD_RES_SUCCESS;
+out:
        free(e);
 
-       return (hdr->flags & SD_FLAG_CMD_WRITE) ? SD_RES_SUCCESS: rsp->result;
+       for (i = 0; i < ARRAY_SIZE(pfds); i++){
+               if (pfds[i].fd >= 0)
+                       close(pfds[i].fd);
+       }
+
+       return ret;
 }
 
 static int check_epoch(struct request *req)
@@ -638,7 +760,10 @@ void store_queue_request(struct work *work, int idx)
        }
 
        if (!(hdr->flags & SD_FLAG_CMD_FORWARD)) {
-               ret = forward_obj_req(req, buf);
+               if (hdr->flags & SD_FLAG_CMD_WRITE)
+                       ret = forward_write_obj_req(req, buf);
+               else
+                       ret = forward_read_obj_req(req, buf);
                goto out;
        }
 
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index b2c0fb8..c66f8bb 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -20,6 +20,7 @@
 #define SD_MAX_NODES 1024
 #define SD_MAX_VMS   4096
 #define SD_MAX_VDI_LEN 256
+#define SD_MAX_REDUNDANCY 8
 
 /* -> vmon */
 
-- 
1.5.6.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to