From: levin li <[email protected]>

In read/write/remove_object(), we directly call forward_*_obj_req() to
forward the request to peer nodes, but without any retrying machanism
as the gateway does, so we should queue a local gateway request for this
routine to make it take advantage of the retrying machanism of gateway.

Signed-off-by: levin li <[email protected]>
---
 sheep/sdnet.c      |   67 +++++++++++++++++++++++++++++++----
 sheep/sheep_priv.h |    6 ++++
 sheep/store.c      |  100 +++++++++++++++-------------------------------------
 3 files changed, 96 insertions(+), 77 deletions(-)

diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 34d65cf..dc860f5 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -13,6 +13,7 @@
 #include <stdlib.h>
 #include <unistd.h>
 #include <netdb.h>
+#include <pthread.h>
 #include <arpa/inet.h>
 #include <netinet/tcp.h>
 #include <sys/epoll.h>
@@ -384,6 +385,50 @@ static void requeue_request(struct request *req)
 static void client_incref(struct client_info *ci);
 static void client_decref(struct client_info *ci);
 
+static struct request *alloc_local_request(void *data, int data_length)
+{
+       struct request *req;
+
+       req = zalloc(sizeof(struct request));
+       if (data_length) {
+               req->data_length = data_length;
+               req->data = data;
+       }
+
+       req->local = 1;
+       pthread_mutex_init(&req->wait_mutex, NULL);
+       pthread_cond_init(&req->wait_cond, NULL);
+
+       INIT_LIST_HEAD(&req->request_list);
+
+       sys->nr_outstanding_reqs++;
+       sys->outstanding_data_size += data_length;
+
+       return req;
+}
+
+int exec_local_req(struct sd_req *rq, void *data, int data_length)
+{
+       struct request *req;
+       int ret;
+
+       req = alloc_local_request(data, data_length);
+       req->rq = *rq;
+       req->rq.data_length = data_length;
+
+       queue_request(req);
+
+       pthread_mutex_lock(&req->wait_mutex);
+       while (!req->done)
+               pthread_cond_wait(&req->wait_cond, &req->wait_mutex);
+       pthread_mutex_unlock(&req->wait_mutex);
+
+       ret = req->rp.result;
+       free(req);
+
+       return ret;
+}
+
 static struct request *alloc_request(struct client_info *ci, int data_length)
 {
        struct request *req;
@@ -425,14 +470,24 @@ void req_done(struct request *req)
 {
        struct client_info *ci = req->ci;
 
-       if (conn_tx_on(&ci->conn)) {
-               dprintf("connection seems to be dead\n");
-               free_request(req);
+       if (req->local) {
+               req->done = 1;
+               sys->nr_outstanding_reqs--;
+               sys->outstanding_data_size -= req->data_length;
+
+               pthread_mutex_lock(&req->wait_mutex);
+               pthread_cond_signal(&req->wait_cond);
+               pthread_mutex_unlock(&req->wait_mutex);
        } else {
-               list_add(&req->request_list, &ci->done_reqs);
-       }
+               if (conn_tx_on(&ci->conn)) {
+                       dprintf("connection seems to be dead\n");
+                       free_request(req);
+               } else {
+                       list_add(&req->request_list, &ci->done_reqs);
+               }
 
-       client_decref(ci);
+               client_decref(ci);
+       }
 }
 
 static void init_rx_hdr(struct client_info *ci)
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index afdaad8..2f07394 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -73,6 +73,11 @@ struct request {
        struct list_head request_list;
        struct list_head pending_list;
 
+       int local;
+       int done;
+       pthread_mutex_t wait_mutex;
+       pthread_cond_t wait_cond;
+
        uint64_t local_oid;
 
        struct vnode_info *vnodes;
@@ -294,6 +299,7 @@ int remove_object(struct vnode_info *vnodes, uint32_t 
node_version,
 
 void del_sheep_fd(int fd);
 int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
+int exec_local_req(struct sd_req *rq, void *data, int data_length);
 
 int prealloc(int fd, uint32_t size);
 
diff --git a/sheep/store.c b/sheep/store.c
index d5a080b..5b62cc5 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -517,8 +517,7 @@ int write_object(struct vnode_info *vnodes, uint32_t epoch,
                 uint64_t oid, char *data, unsigned int datalen,
                 uint64_t offset, uint16_t flags, int nr_copies, int create)
 {
-       struct request write_req;
-       struct sd_req *hdr = &write_req.rq;
+       struct sd_req hdr;
        int ret;
 
        if (sys->enable_write_cache && object_is_cached(oid)) {
@@ -531,23 +530,18 @@ int write_object(struct vnode_info *vnodes, uint32_t 
epoch,
                }
        }
 
-       memset(&write_req, 0, sizeof(write_req));
-       hdr->opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ;
-       hdr->flags = SD_FLAG_CMD_WRITE;
-       hdr->data_length = datalen;
-       hdr->epoch = epoch;
+       memset(&hdr, 0, sizeof(hdr));
+       hdr.opcode = create ? SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ;
+       hdr.flags = SD_FLAG_CMD_WRITE;
 
-       hdr->obj.oid = oid;
-       hdr->obj.offset = offset;
-       hdr->obj.copies = nr_copies;
+       hdr.obj.oid = oid;
+       hdr.obj.offset = offset;
+       hdr.obj.copies = nr_copies;
 
-       write_req.data = data;
-       write_req.op = get_sd_op(hdr->opcode);
-       write_req.vnodes = vnodes;
-
-       ret = forward_write_obj_req(&write_req);
+       ret = exec_local_req(&hdr, data, datalen);
        if (ret != SD_RES_SUCCESS)
-               eprintf("failed to forward write object %x\n", ret);
+               eprintf("failed to write object %" PRIx64 ", %x\n", oid, ret);
+
        return ret;
 }
 
@@ -559,8 +553,7 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch,
                uint64_t oid, char *data, unsigned int datalen,
                uint64_t offset, int nr_copies)
 {
-       struct request read_req;
-       struct sd_req *hdr = &read_req.rq;
+       struct sd_req hdr;
        int ret;
 
        if (sys->enable_write_cache && object_is_cached(oid)) {
@@ -573,23 +566,17 @@ int read_object(struct vnode_info *vnodes, uint32_t epoch,
                }
                return ret;
        }
-       memset(&read_req, 0, sizeof(read_req));
-forward_read:
-       hdr->opcode = SD_OP_READ_OBJ;
-       hdr->data_length = datalen;
-       hdr->epoch = epoch;
-
-       hdr->obj.oid = oid;
-       hdr->obj.offset = offset;
-       hdr->obj.copies = nr_copies;
 
-       read_req.data = data;
-       read_req.op = get_sd_op(hdr->opcode);
-       read_req.vnodes = vnodes;
+forward_read:
+       memset(&hdr, 0, sizeof(hdr));
+       hdr.opcode = SD_OP_READ_OBJ;
+       hdr.obj.oid = oid;
+       hdr.obj.offset = offset;
+       hdr.obj.copies = nr_copies;
 
-       ret = forward_read_obj_req(&read_req);
+       ret = exec_local_req(&hdr, data, datalen);
        if (ret != SD_RES_SUCCESS)
-               eprintf("failed to forward read object %x\n", ret);
+               eprintf("failed to read object %" PRIx64 ", %x\n", oid, ret);
 
        return ret;
 }
@@ -597,49 +584,20 @@ forward_read:
 int remove_object(struct vnode_info *vnodes, uint32_t epoch,
                  uint64_t oid, int nr)
 {
-       struct sd_vnode *obj_vnodes[SD_MAX_COPIES];
-       int err = 0, i = 0;
-
-       oid_to_vnodes(vnodes, oid, nr, obj_vnodes);
-       for (i = 0; i < nr; i++) {
-               struct sd_req hdr;
-               struct sd_rsp *rsp = (struct sd_rsp *)&hdr;
-               struct sd_vnode *v;
-               unsigned wlen = 0, rlen = 0;
-               char name[128];
-               int fd, ret;
-
-               v = obj_vnodes[i];
-               addr_to_str(name, sizeof(name), v->addr, 0);
-
-               fd = connect_to(name, v->port);
-               if (fd < 0) {
-                       rsp->result = SD_RES_NETWORK_ERROR;
-                       return -1;
-               }
-
-               memset(&hdr, 0, sizeof(hdr));
-               hdr.epoch = epoch;
-               hdr.opcode = SD_OP_REMOVE_OBJ;
-               hdr.flags = 0;
-               hdr.data_length = rlen;
-
-               hdr.obj.oid = oid;
-
-               ret = exec_req(fd, &hdr, NULL, &wlen, &rlen);
-               close(fd);
+       struct sd_req hdr;
+       int ret;
 
-               if (ret)
-                       return -1;
+       memset(&hdr, 0, sizeof(hdr));
+       hdr.opcode = SD_OP_REMOVE_OBJ;
+       hdr.obj.oid = oid;
+       hdr.obj.copies = nr;
 
-               if (rsp->result != SD_RES_SUCCESS)
-                       err = 1;
-       }
+       ret = exec_local_req(&hdr, NULL, 0);
 
-       if (err)
-               return -1;
+       if (ret != SD_RES_SUCCESS)
+               eprintf("failed to remove object %" PRIx64 ", %x\n", oid, ret);
 
-       return 0;
+       return ret;
 }
 
 int set_cluster_copies(uint8_t copies)
-- 
1.7.10

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to