Using different opcodes instead of the SD_FLAG_CMD_IO_LOCAL flags allows
to completely separate the internal protocol from the client facing one.

The only complication is that we can't use do_process_work for I/O on
local objects now, but need to invoke the methods directly.

Signed-off-by: Christoph Hellwig <[email protected]>

---
 collie/common.c          |    2 -
 collie/vdi.c             |   12 +++++-----
 include/internal_proto.h |    9 ++++----
 sheep/gateway.c          |   24 +++++++++++----------
 sheep/ops.c              |   52 +++++++++++++++++++++++++++++++----------------
 sheep/recovery.c         |    4 +--
 sheep/sdnet.c            |   27 +++++++++---------------
 sheep/sheep_priv.h       |    8 ++++++-
 8 files changed, 79 insertions(+), 59 deletions(-)

Index: sheepdog/collie/common.c
===================================================================
--- sheepdog.orig/collie/common.c       2012-07-04 15:50:44.913702897 +0200
+++ sheepdog/collie/common.c    2012-07-04 15:55:14.823701284 +0200
@@ -103,7 +103,7 @@ int sd_write_object(uint64_t oid, uint64
        else
                hdr.opcode = SD_OP_WRITE_OBJ;
        hdr.data_length = wlen;
-       hdr.flags = (flags & ~SD_FLAG_CMD_IO_LOCAL) | SD_FLAG_CMD_WRITE;
+       hdr.flags = flags | SD_FLAG_CMD_WRITE;
 
        hdr.obj.copies = copies;
        hdr.obj.oid = oid;
Index: sheepdog/collie/vdi.c
===================================================================
--- sheepdog.orig/collie/vdi.c  2012-07-04 15:50:44.913702897 +0200
+++ sheepdog/collie/vdi.c       2012-07-04 15:55:14.823701284 +0200
@@ -282,9 +282,9 @@ static void parse_objs(uint64_t oid, obj
 
                memset(&hdr, 0, sizeof(hdr));
 
-               hdr.opcode = SD_OP_READ_OBJ;
+               hdr.opcode = SD_OP_READ_PEER;
                hdr.data_length = rlen;
-               hdr.flags = SD_FLAG_CMD_IO_LOCAL;
+               hdr.flags = 0;
                hdr.epoch = sd_epoch;
 
                hdr.obj.oid = oid;
@@ -1339,9 +1339,9 @@ static void *read_object_from(struct sd_
                exit(EXIT_FAILURE);
        }
 
-       hdr.opcode = SD_OP_READ_OBJ;
+       hdr.opcode = SD_OP_READ_PEER;
        hdr.epoch = sd_epoch;
-       hdr.flags = SD_FLAG_CMD_IO_LOCAL;
+       hdr.flags = 0;
        hdr.data_length = rlen;
 
        hdr.obj.oid = oid;
@@ -1378,9 +1378,9 @@ static void write_object_to(struct sd_vn
                exit(EXIT_FAILURE);
        }
 
-       hdr.opcode = SD_OP_WRITE_OBJ;
+       hdr.opcode = SD_OP_WRITE_PEER;
        hdr.epoch = sd_epoch;
-       hdr.flags = SD_FLAG_CMD_IO_LOCAL | SD_FLAG_CMD_WRITE;
+       hdr.flags = SD_FLAG_CMD_WRITE;
        hdr.data_length = wlen;
 
        hdr.obj.oid = oid;
Index: sheepdog/include/internal_proto.h
===================================================================
--- sheepdog.orig/include/internal_proto.h      2012-07-04 15:50:44.913702897 
+0200
+++ sheepdog/include/internal_proto.h   2012-07-04 15:55:14.823701284 +0200
@@ -54,11 +54,12 @@
 #define SD_OP_STAT_RECOVERY  0x97
 #define SD_OP_FLUSH_DEL_CACHE  0x98
 #define SD_OP_GET_OBJ_LIST   0xA1
-#define SD_OP_GET_EPOCH      0XA2
+#define SD_OP_GET_EPOCH      0xA2
+#define SD_OP_CREATE_AND_WRITE_PEER 0xa3
+#define SD_OP_READ_PEER      0xa4
+#define SD_OP_WRITE_PEER     0xa5
 
-
-
-#define SD_FLAG_CMD_IO_LOCAL   0x0010
+/* 0x0010 was SD_FLAG_CMD_IO_LOCAL */
 #define SD_FLAG_CMD_RECOVERY 0x0020
 
 /* set this flag when you want to read a VDI which is opened by
Index: sheepdog/sheep/gateway.c
===================================================================
--- sheepdog.orig/sheep/gateway.c       2012-07-04 15:55:13.750367959 +0200
+++ sheepdog/sheep/gateway.c    2012-07-04 15:59:06.757033233 +0200
@@ -31,16 +31,13 @@ static int forward_read_obj_req(struct r
        uint64_t oid = req->rq.obj.oid;
        int nr_copies, j;
 
-       memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
-       fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
-
        nr_copies = get_nr_copies(req->vnodes);
        oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes);
        for (i = 0; i < nr_copies; i++) {
                v = obj_vnodes[i];
                if (!vnode_is_local(v))
                        continue;
-               ret = do_process_work(req);
+               ret = peer_read_obj(req);
                if (ret == SD_RES_SUCCESS)
                        return ret;
 
@@ -57,6 +54,9 @@ static int forward_read_obj_req(struct r
                struct sockfd *sfd;
                int idx = (i + j) % nr_copies;
 
+               memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
+               fwd_hdr.opcode = SD_OP_READ_PEER;
+
                v = obj_vnodes[idx];
                if (vnode_is_local(v))
                        continue;
@@ -88,11 +88,6 @@ static int forward_read_obj_req(struct r
                        eprintf("remote read fail %x\n", ret);
                        sheep_put_sockfd(&v->nid, sfd);
                }
-               if (i + 1 != nr_copies) {
-                       /* Reset the hdr for next read */
-                       memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
-                       fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
-               }
        }
        return ret;
 }
@@ -233,7 +228,10 @@ static int forward_write_obj_req(struct
 
        write_info_init(&wi);
        memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr));
-       fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL;
+       if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
+               fwd_hdr.opcode = SD_OP_CREATE_AND_WRITE_PEER;
+       else
+               fwd_hdr.opcode = SD_OP_WRITE_PEER;
 
        wlen = fwd_hdr.data_length;
 
@@ -268,7 +266,11 @@ static int forward_write_obj_req(struct
        if (local != -1 && err_ret == SD_RES_SUCCESS) {
                v = obj_vnodes[local];
 
-               ret = do_process_work(req);
+               if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ)
+                       ret = peer_create_and_write_obj(req);
+               else
+                       ret = peer_write_obj(req);
+
                if (ret != SD_RES_SUCCESS) {
                        eprintf("fail to write local %"PRIx32"\n", ret);
                        err_ret = ret;
Index: sheepdog/sheep/ops.c
===================================================================
--- sheepdog.orig/sheep/ops.c   2012-07-04 15:50:44.913702897 +0200
+++ sheepdog/sheep/ops.c        2012-07-04 15:58:07.413700254 +0200
@@ -29,7 +29,8 @@
 enum sd_op_type {
        SD_OP_TYPE_CLUSTER = 1, /* cluster operations */
        SD_OP_TYPE_LOCAL,       /* local operations */
-       SD_OP_TYPE_IO,          /* io operations */
+       SD_OP_TYPE_PEER,          /* io operations */
+       SD_OP_TYPE_GATEWAY,     /* gateway operations */
 };
 
 struct sd_op_template {
@@ -51,7 +52,7 @@ struct sd_op_template {
         * If type is SD_OP_TYPE_LOCAL, both process_work() and process_main()
         * will be called on the local node.
         *
-        * If type is SD_OP_TYPE_IO, only process_work() will be called, and it
+        * If type is SD_OP_TYPE_PEER, only process_work() will be called, and 
it
         * will be called on the local node.
         */
        int (*process_work)(struct request *req);
@@ -613,8 +614,7 @@ static int read_copy_from_replica(struct
                wlen = 0;
 
                memset(&hdr, 0, sizeof(hdr));
-               hdr.opcode = SD_OP_READ_OBJ;
-               hdr.flags = SD_FLAG_CMD_IO_LOCAL;
+               hdr.opcode = SD_OP_READ_PEER;
                hdr.epoch = epoch;
                hdr.data_length = rlen;
 
@@ -650,7 +650,7 @@ static int store_remove_obj(struct reque
        return sd_store->remove_object(oid);
 }
 
-static int store_read_obj(struct request *req)
+int peer_read_obj(struct request *req)
 {
        struct sd_req *hdr = &req->rq;
        struct sd_rsp *rsp = &req->rp;
@@ -703,7 +703,7 @@ static int do_write_obj(struct siocb *io
        return ret;
 }
 
-static int store_write_obj(struct request *req)
+int peer_write_obj(struct request *req)
 {
        struct sd_req *hdr = &req->rq;
        uint32_t epoch = hdr->epoch;
@@ -715,7 +715,7 @@ static int store_write_obj(struct reques
        return do_write_obj(&iocb, hdr, epoch, req->data, 0);
 }
 
-static int store_create_and_write_obj(struct request *req)
+int peer_create_and_write_obj(struct request *req)
 {
        struct sd_req *hdr = &req->rq;
        struct sd_req cow_hdr;
@@ -918,26 +918,39 @@ static struct sd_op_template sd_ops[] =
                .process_main = local_trace_cat_ops,
        },
 
-       /* I/O operations */
+       /* gateway I/O operations */
        [SD_OP_CREATE_AND_WRITE_OBJ] = {
-               .type = SD_OP_TYPE_IO,
-               .process_work = store_create_and_write_obj,
+               .type = SD_OP_TYPE_GATEWAY,
        },
 
        [SD_OP_READ_OBJ] = {
-               .type = SD_OP_TYPE_IO,
-               .process_work = store_read_obj,
+               .type = SD_OP_TYPE_GATEWAY,
        },
 
        [SD_OP_WRITE_OBJ] = {
-               .type = SD_OP_TYPE_IO,
-               .process_work = store_write_obj,
+               .type = SD_OP_TYPE_GATEWAY,
        },
 
+       /* peer I/O operations */
        [SD_OP_REMOVE_OBJ] = {
-               .type = SD_OP_TYPE_IO,
+               .type = SD_OP_TYPE_PEER,
                .process_work = store_remove_obj,
        },
+
+       [SD_OP_CREATE_AND_WRITE_PEER] = {
+               .type = SD_OP_TYPE_PEER,
+               .process_work = peer_create_and_write_obj,
+       },
+
+       [SD_OP_READ_PEER] = {
+               .type = SD_OP_TYPE_PEER,
+               .process_work = peer_read_obj,
+       },
+
+       [SD_OP_WRITE_PEER] = {
+               .type = SD_OP_TYPE_PEER,
+               .process_work = peer_write_obj,
+       },
 };
 
 struct sd_op_template *get_sd_op(uint8_t opcode)
@@ -958,9 +971,14 @@ int is_local_op(struct sd_op_template *o
        return op->type == SD_OP_TYPE_LOCAL;
 }
 
-int is_io_op(struct sd_op_template *op)
+int is_peer_op(struct sd_op_template *op)
+{
+       return op->type == SD_OP_TYPE_PEER;
+}
+
+int is_gateway_op(struct sd_op_template *op)
 {
-       return op->type == SD_OP_TYPE_IO;
+       return op->type == SD_OP_TYPE_GATEWAY;
 }
 
 int is_force_op(struct sd_op_template *op)
Index: sheepdog/sheep/recovery.c
===================================================================
--- sheepdog.orig/sheep/recovery.c      2012-07-04 15:50:44.913702897 +0200
+++ sheepdog/sheep/recovery.c   2012-07-04 15:55:14.823701284 +0200
@@ -103,9 +103,9 @@ static int recover_object_from_replica(u
        }
 
        memset(&hdr, 0, sizeof(hdr));
-       hdr.opcode = SD_OP_READ_OBJ;
+       hdr.opcode = SD_OP_READ_PEER;
        hdr.epoch = epoch;
-       hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL;
+       hdr.flags = SD_FLAG_CMD_RECOVERY;
        hdr.data_length = rlen;
 
        hdr.obj.oid = oid;
Index: sheepdog/sheep/sdnet.c
===================================================================
--- sheepdog.orig/sheep/sdnet.c 2012-07-04 15:50:44.913702897 +0200
+++ sheepdog/sheep/sdnet.c      2012-07-04 15:56:17.503700912 +0200
@@ -181,14 +181,14 @@ void resume_wait_epoch_requests(void)
                         * Gateway retries to send the request when
                         * its epoch changes.
                         */
-                       assert(!(req->rq.flags & SD_FLAG_CMD_IO_LOCAL));
+                       assert(is_gateway_op(req->op));
                        req->rq.epoch = sys->epoch;
                        list_del(&req->request_list);
                        requeue_request(req);
                        break;
                case SD_RES_NEW_NODE_VER:
                        /* Peer retries the request locally when its epoch 
changes. */
-                       assert(req->rq.flags & SD_FLAG_CMD_IO_LOCAL);
+                       assert(!is_gateway_op(req->op));
                        list_del(&req->request_list);
                        requeue_request(req);
                        break;
@@ -252,7 +252,7 @@ void flush_wait_obj_requests(void)
        }
 }
 
-static void queue_io_request(struct request *req)
+static void queue_peer_request(struct request *req)
 {
        req->local_oid = req->rq.obj.oid;
        if (req->local_oid) {
@@ -340,15 +340,6 @@ static void queue_request(struct request
        }
 
        /*
-        * we set epoch for non direct requests here.  Note that we need to
-        * sample sys->epoch before passing requests to worker threads as
-        * it can change anytime we return to processing membership change
-        * events.
-        */
-       if (!(hdr->flags & SD_FLAG_CMD_IO_LOCAL))
-               hdr->epoch = sys->epoch;
-
-       /*
         * force operations shouldn't access req->vnodes in their
         * process_work() and process_main() because they can be
         * called before we set up current_vnode_info
@@ -356,14 +347,16 @@ static void queue_request(struct request
        if (!is_force_op(req->op))
                req->vnodes = get_vnode_info();
 
-       if (is_io_op(req->op)) {
-               if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL)
-                       queue_io_request(req);
-               else
-                       queue_gateway_request(req);
+       if (is_peer_op(req->op)) {
+               queue_peer_request(req);
+       } else if (is_gateway_op(req->op)) {
+               hdr->epoch = sys->epoch;
+               queue_gateway_request(req);
        } else if (is_local_op(req->op)) {
+               hdr->epoch = sys->epoch;
                queue_local_request(req);
        } else if (is_cluster_op(req->op)) {
+               hdr->epoch = sys->epoch;
                queue_cluster_request(req);
        } else {
                eprintf("unknown operation %d\n", hdr->opcode);
Index: sheepdog/sheep/sheep_priv.h
===================================================================
--- sheepdog.orig/sheep/sheep_priv.h    2012-07-04 15:55:13.750367959 +0200
+++ sheepdog/sheep/sheep_priv.h 2012-07-04 15:58:29.173700123 +0200
@@ -288,7 +288,8 @@ void put_request(struct request *req);
 struct sd_op_template *get_sd_op(uint8_t opcode);
 int is_cluster_op(struct sd_op_template *op);
 int is_local_op(struct sd_op_template *op);
-int is_io_op(struct sd_op_template *op);
+int is_peer_op(struct sd_op_template *op);
+int is_gateway_op(struct sd_op_template *op);
 int is_force_op(struct sd_op_template *op);
 int has_process_work(struct sd_op_template *op);
 int has_process_main(struct sd_op_template *op);
@@ -366,6 +367,11 @@ static inline int sys_can_halt(void)
        return sys_stat_ok() && !sys_flag_nohalt();
 }
 
+/* backend store */
+int peer_read_obj(struct request *req);
+int peer_write_obj(struct request *req);
+int peer_create_and_write_obj(struct request *req);
+
 /* object_cache */
 
 int bypass_object_cache(struct request *req);

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to