Using different opcodes instead of the SD_FLAG_CMD_IO_LOCAL flags allows to completely separate the internal protocol from the client facing one.
The only complication is that we can't use do_process_work for I/O on local objects now, but need to invoke the methods directly. Signed-off-by: Christoph Hellwig <[email protected]> --- collie/common.c | 2 - collie/vdi.c | 12 +++++----- include/internal_proto.h | 9 ++++---- sheep/gateway.c | 24 +++++++++++---------- sheep/ops.c | 52 +++++++++++++++++++++++++++++++---------------- sheep/recovery.c | 4 +-- sheep/sdnet.c | 27 +++++++++--------------- sheep/sheep_priv.h | 8 ++++++- 8 files changed, 79 insertions(+), 59 deletions(-) Index: sheepdog/collie/common.c =================================================================== --- sheepdog.orig/collie/common.c 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/collie/common.c 2012-07-04 15:55:14.823701284 +0200 @@ -103,7 +103,7 @@ int sd_write_object(uint64_t oid, uint64 else hdr.opcode = SD_OP_WRITE_OBJ; hdr.data_length = wlen; - hdr.flags = (flags & ~SD_FLAG_CMD_IO_LOCAL) | SD_FLAG_CMD_WRITE; + hdr.flags = flags | SD_FLAG_CMD_WRITE; hdr.obj.copies = copies; hdr.obj.oid = oid; Index: sheepdog/collie/vdi.c =================================================================== --- sheepdog.orig/collie/vdi.c 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/collie/vdi.c 2012-07-04 15:55:14.823701284 +0200 @@ -282,9 +282,9 @@ static void parse_objs(uint64_t oid, obj memset(&hdr, 0, sizeof(hdr)); - hdr.opcode = SD_OP_READ_OBJ; + hdr.opcode = SD_OP_READ_PEER; hdr.data_length = rlen; - hdr.flags = SD_FLAG_CMD_IO_LOCAL; + hdr.flags = 0; hdr.epoch = sd_epoch; hdr.obj.oid = oid; @@ -1339,9 +1339,9 @@ static void *read_object_from(struct sd_ exit(EXIT_FAILURE); } - hdr.opcode = SD_OP_READ_OBJ; + hdr.opcode = SD_OP_READ_PEER; hdr.epoch = sd_epoch; - hdr.flags = SD_FLAG_CMD_IO_LOCAL; + hdr.flags = 0; hdr.data_length = rlen; hdr.obj.oid = oid; @@ -1378,9 +1378,9 @@ static void write_object_to(struct sd_vn exit(EXIT_FAILURE); } - hdr.opcode = SD_OP_WRITE_OBJ; + hdr.opcode = SD_OP_WRITE_PEER; hdr.epoch = sd_epoch; - hdr.flags = SD_FLAG_CMD_IO_LOCAL | SD_FLAG_CMD_WRITE; + hdr.flags = SD_FLAG_CMD_WRITE; hdr.data_length = wlen; hdr.obj.oid = oid; Index: sheepdog/include/internal_proto.h =================================================================== --- sheepdog.orig/include/internal_proto.h 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/include/internal_proto.h 2012-07-04 15:55:14.823701284 +0200 @@ -54,11 +54,12 @@ #define SD_OP_STAT_RECOVERY 0x97 #define SD_OP_FLUSH_DEL_CACHE 0x98 #define SD_OP_GET_OBJ_LIST 0xA1 -#define SD_OP_GET_EPOCH 0XA2 +#define SD_OP_GET_EPOCH 0xA2 +#define SD_OP_CREATE_AND_WRITE_PEER 0xa3 +#define SD_OP_READ_PEER 0xa4 +#define SD_OP_WRITE_PEER 0xa5 - - -#define SD_FLAG_CMD_IO_LOCAL 0x0010 +/* 0x0010 was SD_FLAG_CMD_IO_LOCAL */ #define SD_FLAG_CMD_RECOVERY 0x0020 /* set this flag when you want to read a VDI which is opened by Index: sheepdog/sheep/gateway.c =================================================================== --- sheepdog.orig/sheep/gateway.c 2012-07-04 15:55:13.750367959 +0200 +++ sheepdog/sheep/gateway.c 2012-07-04 15:59:06.757033233 +0200 @@ -31,16 +31,13 @@ static int forward_read_obj_req(struct r uint64_t oid = req->rq.obj.oid; int nr_copies, j; - memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); - fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; - nr_copies = get_nr_copies(req->vnodes); oid_to_vnodes(req->vnodes, oid, nr_copies, obj_vnodes); for (i = 0; i < nr_copies; i++) { v = obj_vnodes[i]; if (!vnode_is_local(v)) continue; - ret = do_process_work(req); + ret = peer_read_obj(req); if (ret == SD_RES_SUCCESS) return ret; @@ -57,6 +54,9 @@ static int forward_read_obj_req(struct r struct sockfd *sfd; int idx = (i + j) % nr_copies; + memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); + fwd_hdr.opcode = SD_OP_READ_PEER; + v = obj_vnodes[idx]; if (vnode_is_local(v)) continue; @@ -88,11 +88,6 @@ static int forward_read_obj_req(struct r eprintf("remote read fail %x\n", ret); sheep_put_sockfd(&v->nid, sfd); } - if (i + 1 != nr_copies) { - /* Reset the hdr for next read */ - memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); - fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; - } } return ret; } @@ -233,7 +228,10 @@ static int forward_write_obj_req(struct write_info_init(&wi); memcpy(&fwd_hdr, &req->rq, sizeof(fwd_hdr)); - fwd_hdr.flags |= SD_FLAG_CMD_IO_LOCAL; + if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ) + fwd_hdr.opcode = SD_OP_CREATE_AND_WRITE_PEER; + else + fwd_hdr.opcode = SD_OP_WRITE_PEER; wlen = fwd_hdr.data_length; @@ -268,7 +266,11 @@ static int forward_write_obj_req(struct if (local != -1 && err_ret == SD_RES_SUCCESS) { v = obj_vnodes[local]; - ret = do_process_work(req); + if (req->rq.opcode == SD_OP_CREATE_AND_WRITE_OBJ) + ret = peer_create_and_write_obj(req); + else + ret = peer_write_obj(req); + if (ret != SD_RES_SUCCESS) { eprintf("fail to write local %"PRIx32"\n", ret); err_ret = ret; Index: sheepdog/sheep/ops.c =================================================================== --- sheepdog.orig/sheep/ops.c 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/sheep/ops.c 2012-07-04 15:58:07.413700254 +0200 @@ -29,7 +29,8 @@ enum sd_op_type { SD_OP_TYPE_CLUSTER = 1, /* cluster operations */ SD_OP_TYPE_LOCAL, /* local operations */ - SD_OP_TYPE_IO, /* io operations */ + SD_OP_TYPE_PEER, /* io operations */ + SD_OP_TYPE_GATEWAY, /* gateway operations */ }; struct sd_op_template { @@ -51,7 +52,7 @@ struct sd_op_template { * If type is SD_OP_TYPE_LOCAL, both process_work() and process_main() * will be called on the local node. * - * If type is SD_OP_TYPE_IO, only process_work() will be called, and it + * If type is SD_OP_TYPE_PEER, only process_work() will be called, and it * will be called on the local node. */ int (*process_work)(struct request *req); @@ -613,8 +614,7 @@ static int read_copy_from_replica(struct wlen = 0; memset(&hdr, 0, sizeof(hdr)); - hdr.opcode = SD_OP_READ_OBJ; - hdr.flags = SD_FLAG_CMD_IO_LOCAL; + hdr.opcode = SD_OP_READ_PEER; hdr.epoch = epoch; hdr.data_length = rlen; @@ -650,7 +650,7 @@ static int store_remove_obj(struct reque return sd_store->remove_object(oid); } -static int store_read_obj(struct request *req) +int peer_read_obj(struct request *req) { struct sd_req *hdr = &req->rq; struct sd_rsp *rsp = &req->rp; @@ -703,7 +703,7 @@ static int do_write_obj(struct siocb *io return ret; } -static int store_write_obj(struct request *req) +int peer_write_obj(struct request *req) { struct sd_req *hdr = &req->rq; uint32_t epoch = hdr->epoch; @@ -715,7 +715,7 @@ static int store_write_obj(struct reques return do_write_obj(&iocb, hdr, epoch, req->data, 0); } -static int store_create_and_write_obj(struct request *req) +int peer_create_and_write_obj(struct request *req) { struct sd_req *hdr = &req->rq; struct sd_req cow_hdr; @@ -918,26 +918,39 @@ static struct sd_op_template sd_ops[] = .process_main = local_trace_cat_ops, }, - /* I/O operations */ + /* gateway I/O operations */ [SD_OP_CREATE_AND_WRITE_OBJ] = { - .type = SD_OP_TYPE_IO, - .process_work = store_create_and_write_obj, + .type = SD_OP_TYPE_GATEWAY, }, [SD_OP_READ_OBJ] = { - .type = SD_OP_TYPE_IO, - .process_work = store_read_obj, + .type = SD_OP_TYPE_GATEWAY, }, [SD_OP_WRITE_OBJ] = { - .type = SD_OP_TYPE_IO, - .process_work = store_write_obj, + .type = SD_OP_TYPE_GATEWAY, }, + /* peer I/O operations */ [SD_OP_REMOVE_OBJ] = { - .type = SD_OP_TYPE_IO, + .type = SD_OP_TYPE_PEER, .process_work = store_remove_obj, }, + + [SD_OP_CREATE_AND_WRITE_PEER] = { + .type = SD_OP_TYPE_PEER, + .process_work = peer_create_and_write_obj, + }, + + [SD_OP_READ_PEER] = { + .type = SD_OP_TYPE_PEER, + .process_work = peer_read_obj, + }, + + [SD_OP_WRITE_PEER] = { + .type = SD_OP_TYPE_PEER, + .process_work = peer_write_obj, + }, }; struct sd_op_template *get_sd_op(uint8_t opcode) @@ -958,9 +971,14 @@ int is_local_op(struct sd_op_template *o return op->type == SD_OP_TYPE_LOCAL; } -int is_io_op(struct sd_op_template *op) +int is_peer_op(struct sd_op_template *op) +{ + return op->type == SD_OP_TYPE_PEER; +} + +int is_gateway_op(struct sd_op_template *op) { - return op->type == SD_OP_TYPE_IO; + return op->type == SD_OP_TYPE_GATEWAY; } int is_force_op(struct sd_op_template *op) Index: sheepdog/sheep/recovery.c =================================================================== --- sheepdog.orig/sheep/recovery.c 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/sheep/recovery.c 2012-07-04 15:55:14.823701284 +0200 @@ -103,9 +103,9 @@ static int recover_object_from_replica(u } memset(&hdr, 0, sizeof(hdr)); - hdr.opcode = SD_OP_READ_OBJ; + hdr.opcode = SD_OP_READ_PEER; hdr.epoch = epoch; - hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_IO_LOCAL; + hdr.flags = SD_FLAG_CMD_RECOVERY; hdr.data_length = rlen; hdr.obj.oid = oid; Index: sheepdog/sheep/sdnet.c =================================================================== --- sheepdog.orig/sheep/sdnet.c 2012-07-04 15:50:44.913702897 +0200 +++ sheepdog/sheep/sdnet.c 2012-07-04 15:56:17.503700912 +0200 @@ -181,14 +181,14 @@ void resume_wait_epoch_requests(void) * Gateway retries to send the request when * its epoch changes. */ - assert(!(req->rq.flags & SD_FLAG_CMD_IO_LOCAL)); + assert(is_gateway_op(req->op)); req->rq.epoch = sys->epoch; list_del(&req->request_list); requeue_request(req); break; case SD_RES_NEW_NODE_VER: /* Peer retries the request locally when its epoch changes. */ - assert(req->rq.flags & SD_FLAG_CMD_IO_LOCAL); + assert(!is_gateway_op(req->op)); list_del(&req->request_list); requeue_request(req); break; @@ -252,7 +252,7 @@ void flush_wait_obj_requests(void) } } -static void queue_io_request(struct request *req) +static void queue_peer_request(struct request *req) { req->local_oid = req->rq.obj.oid; if (req->local_oid) { @@ -340,15 +340,6 @@ static void queue_request(struct request } /* - * we set epoch for non direct requests here. Note that we need to - * sample sys->epoch before passing requests to worker threads as - * it can change anytime we return to processing membership change - * events. - */ - if (!(hdr->flags & SD_FLAG_CMD_IO_LOCAL)) - hdr->epoch = sys->epoch; - - /* * force operations shouldn't access req->vnodes in their * process_work() and process_main() because they can be * called before we set up current_vnode_info @@ -356,14 +347,16 @@ static void queue_request(struct request if (!is_force_op(req->op)) req->vnodes = get_vnode_info(); - if (is_io_op(req->op)) { - if (req->rq.flags & SD_FLAG_CMD_IO_LOCAL) - queue_io_request(req); - else - queue_gateway_request(req); + if (is_peer_op(req->op)) { + queue_peer_request(req); + } else if (is_gateway_op(req->op)) { + hdr->epoch = sys->epoch; + queue_gateway_request(req); } else if (is_local_op(req->op)) { + hdr->epoch = sys->epoch; queue_local_request(req); } else if (is_cluster_op(req->op)) { + hdr->epoch = sys->epoch; queue_cluster_request(req); } else { eprintf("unknown operation %d\n", hdr->opcode); Index: sheepdog/sheep/sheep_priv.h =================================================================== --- sheepdog.orig/sheep/sheep_priv.h 2012-07-04 15:55:13.750367959 +0200 +++ sheepdog/sheep/sheep_priv.h 2012-07-04 15:58:29.173700123 +0200 @@ -288,7 +288,8 @@ void put_request(struct request *req); struct sd_op_template *get_sd_op(uint8_t opcode); int is_cluster_op(struct sd_op_template *op); int is_local_op(struct sd_op_template *op); -int is_io_op(struct sd_op_template *op); +int is_peer_op(struct sd_op_template *op); +int is_gateway_op(struct sd_op_template *op); int is_force_op(struct sd_op_template *op); int has_process_work(struct sd_op_template *op); int has_process_main(struct sd_op_template *op); @@ -366,6 +367,11 @@ static inline int sys_can_halt(void) return sys_stat_ok() && !sys_flag_nohalt(); } +/* backend store */ +int peer_read_obj(struct request *req); +int peer_write_obj(struct request *req); +int peer_create_and_write_obj(struct request *req); + /* object_cache */ int bypass_object_cache(struct request *req); -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
