From: Liu Yuan <[email protected]>

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/gateway.c |  210 +++++++++++++++++++++++++++++++------------------------
 1 file changed, 120 insertions(+), 90 deletions(-)

diff --git a/sheep/gateway.c b/sheep/gateway.c
index f126dfb..c45490d 100644
--- a/sheep/gateway.c
+++ b/sheep/gateway.c
@@ -89,29 +89,115 @@ struct write_info {
        struct pollfd pfds[SD_MAX_REDUNDANCY];
        struct sd_vnode *vnodes[SD_MAX_REDUNDANCY];
        int sock_idx[SD_MAX_REDUNDANCY];
+       int nr_sent;
 };
 
+static inline void update_write_info(struct write_info *wi, int pos)
+{
+       dprintf("%d, %d\n", wi->nr_sent, pos);
+       wi->nr_sent--;
+       memmove(wi->pfds + pos, wi->pfds + pos + 1,
+               sizeof(struct pollfd) * (wi->nr_sent - pos));
+       memmove(wi->vnodes + pos, wi->vnodes + pos + 1,
+               sizeof(struct sd_vnode *) * (wi->nr_sent - pos));
+       memmove(wi->sock_idx + pos, wi->sock_idx + pos + 1,
+               sizeof(int) * (wi->nr_sent - pos));
+}
+
+static inline void finish_one_write(struct write_info *wi, int i)
+{
+       sheep_put_fd(wi->vnodes[i], wi->pfds[i].fd,
+                    wi->sock_idx[i]);
+       update_write_info(wi, i);
+}
+
+static inline void finish_one_write_err(struct write_info *wi, int i)
+{
+       sheep_del_fd(wi->vnodes[i], wi->pfds[i].fd,
+                    wi->sock_idx[i]);
+       update_write_info(wi, i);
+}
+
+/*
+ * Wait for all forward writes completion.
+ *
+ * Even if something goes wrong, we have to wait forward write completion to
+ * avoid interleaved requests.
+ *
+ * Return error code if any one request fails.
+ */
+static int wait_forward_write(struct write_info *wi, struct sd_rsp *rsp)
+{
+       int nr_sent, err_ret = SD_RES_SUCCESS, ret, pollret, i;
+again:
+       pollret = poll(wi->pfds, wi->nr_sent, -1);
+       if (pollret < 0) {
+               if (errno == EINTR)
+                       goto again;
+
+               panic("%m\n");
+       }
+
+       nr_sent = wi->nr_sent;
+       for (i = 0; i < nr_sent; i++)
+               if (wi->pfds[i].revents & POLLIN)
+                       break;
+       if (i < nr_sent) {
+               int re = wi->pfds[i].revents;
+               dprintf("%d, revents %x\n", i, re);
+               if (re & (POLLERR | POLLHUP | POLLNVAL)) {
+                       err_ret = SD_RES_NETWORK_ERROR;
+                       finish_one_write_err(wi, i);
+               } else if (re & POLLIN) {
+                       if (do_read(wi->pfds[i].fd, rsp, sizeof(*rsp))) {
+                               eprintf("remote node might be crashed\n");
+                               err_ret = SD_RES_NETWORK_ERROR;
+                               finish_one_write_err(wi, i);
+                               goto finish_write;
+                       }
+
+                       ret = rsp->result;
+                       if (ret != SD_RES_SUCCESS) {
+                               eprintf("fail %"PRIu32"\n", ret);
+                               err_ret = ret;
+                       }
+                       finish_one_write(wi, i);
+               } else {
+                       eprintf("unhanlded poll event\n");
+               }
+       }
+finish_write:
+       if (wi->nr_sent > 0)
+               goto again;
+
+       return err_ret;
+}
+
+static void init_write_info(struct write_info *wi)
+{
+       int i;
+       for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
+               wi->pfds[i].fd = -1;
+               wi->vnodes[i] = NULL;
+       }
+       wi->nr_sent = 0;
+}
+
 int forward_write_obj_req(struct request *req)
 {
-       int i, fd, ret, pollret;
+       int i, fd, err_ret = SD_RES_SUCCESS, ret;
        unsigned wlen;
-       char name[128];
        struct sd_req fwd_hdr;
        struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
        struct sd_vnode *v;
        struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
        uint64_t oid = req->rq.obj.oid;
        int nr_copies;
-       int nr_fds = 0, local = 0;
        struct write_info wi;
 
        dprintf("%"PRIx64"\n", oid);
 
-       for (i = 0; i < SD_MAX_REDUNDANCY; i++) {
-               wi.pfds[i].fd = -1;
-               wi.vnodes[i] = NULL;
-       }
-
+       init_write_info(&wi);
        memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
        fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
 
@@ -121,107 +207,51 @@ int forward_write_obj_req(struct request *req)
        oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
        for (i = 0; i < nr_copies; i++) {
                v = obj_vnodes[i];
-
-               addr_to_str(name, sizeof(name), v->addr, 0);
-
-               if (vnode_is_local(v)) {
-                       local = 1;
+               if (!vnode_is_local(v))
                        continue;
-               }
-
-               fd = sheep_get_fd(v, &wi.sock_idx[nr_fds]);
-               if (fd < 0) {
-                       ret = SD_RES_NETWORK_ERROR;
-                       goto err;
-               }
-
-               ret = send_req(fd, &fwd_hdr, req->data, &wlen);
-               if (ret) { /* network errors */
-                       sheep_del_fd(v, fd, wi.sock_idx[nr_fds]);
-                       ret = SD_RES_NETWORK_ERROR;
-                       dprintf("fail %"PRIu32"\n", ret);
-                       goto err;
-               }
 
-               wi.vnodes[nr_fds] = v;
-               wi.pfds[nr_fds].fd = fd;
-               wi.pfds[nr_fds].events = POLLIN;
-               nr_fds++;
-       }
-
-       if (local) {
                ret = do_local_io(req, fwd_hdr.epoch);
-               rsp->result = ret;
 
-               if (rsp->result != SD_RES_SUCCESS) {
+               if (ret != SD_RES_SUCCESS) {
                        eprintf("fail to write local %"PRIu32"\n", ret);
-                       ret = rsp->result;
-                       goto err;
+                       return ret;
                }
-
-               if (nr_fds == 0)
-                       goto out;
        }
 
-       ret = SD_RES_SUCCESS;
-again:
-       pollret = poll(wi.pfds, nr_fds, -1);
-       if (pollret < 0) {
-               if (errno == EINTR)
-                       goto again;
+       for (i = 0; i < nr_copies; i++) {
+               v = obj_vnodes[i];
 
-               ret = SD_RES_NETWORK_ERROR;
-               goto err;
-       }
+               if (vnode_is_local(v))
+                       continue;
 
-       for (i = 0; i < nr_fds; i++) {
-               if (wi.pfds[i].revents & POLLERR ||
-                   wi.pfds[i].revents & POLLHUP ||
-                   wi.pfds[i].revents & POLLNVAL) {
-                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
-                                    wi.sock_idx[i]);
-                       ret = SD_RES_NETWORK_ERROR;
+               fd = sheep_get_fd(v, &wi.sock_idx[wi.nr_sent]);
+               if (fd < 0) {
+                       err_ret = SD_RES_NETWORK_ERROR;
                        break;
                }
 
-               if (!(wi.pfds[i].revents & POLLIN))
-                       continue;
-
-               if (do_read(wi.pfds[i].fd, rsp, sizeof(*rsp))) {
-                       eprintf("failed to read a response: %m\n");
-                       sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd,
-                                    wi.sock_idx[i]);
-                       ret = SD_RES_NETWORK_ERROR;
+               ret = send_req(fd, &fwd_hdr, req->data, &wlen);
+               if (ret) {
+                       sheep_del_fd(v, fd, wi.sock_idx[wi.nr_sent]);
+                       err_ret = SD_RES_NETWORK_ERROR;
+                       dprintf("fail %"PRIu32"\n", ret);
                        break;
                }
 
-               if (rsp->result != SD_RES_SUCCESS) {
-                       eprintf("fail %"PRIu32"\n", rsp->result);
-                       ret = rsp->result;
-               }
-               sheep_put_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
-               break;
-       }
-       if (i < nr_fds) {
-               nr_fds--;
-               memmove(wi.pfds + i, wi.pfds + i + 1,
-                       sizeof(struct pollfd) * (nr_fds - i));
-               memmove(wi.vnodes + i, wi.vnodes + i + 1,
-                       sizeof(struct sd_vnode *) * (nr_fds - i));
-               memmove(wi.sock_idx + i, wi.sock_idx + i + 1,
-                       sizeof(int) * (nr_fds - i));
+               wi.vnodes[wi.nr_sent] = v;
+               wi.pfds[wi.nr_sent].fd = fd;
+               wi.pfds[wi.nr_sent].events = POLLIN;
+               wi.nr_sent++;
        }
 
-       dprintf("%"PRIx64" %"PRIu32"\n", oid, nr_fds);
+       dprintf("nr_sent %d, err %d\n", wi.nr_sent, err_ret);
+       if (wi.nr_sent > 0) {
+               ret = wait_forward_write(&wi, rsp);
+               if (ret != SD_RES_SUCCESS)
+                       err_ret = ret;
+       }
 
-       if (nr_fds > 0)
-               goto again;
-out:
-       return ret;
-err:
-       for (i = 0; i < nr_fds; i++)
-               sheep_del_fd(wi.vnodes[i], wi.pfds[i].fd, wi.sock_idx[i]);
-       return ret;
+       return err_ret;
 }
 
 void do_gateway_request(struct work *work)
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to