From: Liu Yuan <[email protected]> ops.c become more and more bloated, let's move peer operations out to keep it fit.
Signed-off-by: Liu Yuan <[email protected]> --- sheep/Makefile.am | 3 +- sheep/ops.c | 216 -------------------------------------------------- sheep/peer.c | 229 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 217 deletions(-) create mode 100644 sheep/peer.c diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 0a874c6..85d89f6 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -26,7 +26,8 @@ sbin_PROGRAMS = sheep sheep_SOURCES = sheep.c group.c sdnet.c gateway.c store.c vdi.c work.c \ journal.c ops.c recovery.c cluster/local.c \ - object_cache.c object_list_cache.c sockfd_cache.c + object_cache.c object_list_cache.c sockfd_cache.c \ + peer.c if BUILD_COROSYNC sheep_SOURCES += cluster/corosync.c diff --git a/sheep/ops.c b/sheep/ops.c index 0da1bbd..54cb346 100644 --- a/sheep/ops.c +++ b/sheep/ops.c @@ -546,222 +546,6 @@ static int local_trace_cat_ops(const struct sd_req *req, struct sd_rsp *rsp, voi return SD_RES_SUCCESS; } -static int read_copy_from_replica(struct vnode_info *vnodes, uint32_t epoch, - uint64_t oid, char *buf) -{ - int i, j, nr_copies, ret; - struct sd_req hdr; - struct sd_rsp *rsp = (struct sd_rsp *)&hdr; - struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; - struct sd_vnode *v; - char name[128]; - int rounded_rand, local = -1; - - nr_copies = get_nr_copies(vnodes); - oid_to_vnodes(vnodes, oid, nr_copies, obj_vnodes); - - /* first try to read from local copy */ - for (i = 0; i < nr_copies; i++) { - struct siocb iocb; - - v = obj_vnodes[i]; - addr_to_str(name, sizeof(name), v->nid.addr, 0); - - if (vnode_is_local(v)) { - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = epoch; - iocb.buf = buf; - iocb.length = SD_DATA_OBJ_SIZE; - iocb.offset = 0; - ret = sd_store->read(oid, &iocb); - if (ret != SD_RES_SUCCESS) { - local = i; - break; - } - goto out; - } - } - - /* then read random copy from cluster for better load balance */ - rounded_rand = random() % nr_copies; - - for (i = 0; i < nr_copies; i++) { - unsigned wlen, rlen; - int fd; - - j = (i + rounded_rand) % nr_copies; - - /* bypass the local copy */ - if (local == j) - continue; - - v = obj_vnodes[j]; - addr_to_str(name, sizeof(name), v->nid.addr, 0); - - fd = connect_to(name, v->nid.port); - if (fd < 0) - continue; - - rlen = SD_DATA_OBJ_SIZE; - wlen = 0; - - sd_init_req(&hdr, SD_OP_READ_PEER); - hdr.epoch = epoch; - hdr.data_length = rlen; - - hdr.obj.oid = oid; - hdr.obj.offset = 0; - - ret = exec_req(fd, &hdr, buf, &wlen, &rlen); - - close(fd); - - if (ret) { - dprintf("%x, %x\n", ret, rsp->result); - continue; - } - - if (rsp->result == SD_RES_SUCCESS) - break; - } - - ret = rsp->result; - dprintf("%"PRIx64" ret:%x\n", oid, ret); -out: - return ret; -} - -int peer_remove_obj(struct request *req) -{ - uint64_t oid = req->rq.obj.oid; - - objlist_cache_remove(oid); - object_cache_remove(oid); - - return sd_store->remove_object(oid); -} - -int peer_read_obj(struct request *req) -{ - struct sd_req *hdr = &req->rq; - struct sd_rsp *rsp = &req->rp; - int ret; - uint32_t epoch = hdr->epoch; - struct siocb iocb; - - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = epoch; - iocb.flags = hdr->flags; - iocb.buf = req->data; - iocb.length = hdr->data_length; - iocb.offset = hdr->obj.offset; - ret = sd_store->read(hdr->obj.oid, &iocb); - if (ret != SD_RES_SUCCESS) - goto out; - - rsp->data_length = hdr->data_length; - rsp->obj.copies = sys->nr_copies; -out: - return ret; -} - -static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch, - void *data, int create) -{ - uint64_t oid = hdr->obj.oid; - int ret = SD_RES_SUCCESS; - void *jd = NULL; - - iocb->buf = data; - iocb->length = hdr->data_length; - iocb->offset = hdr->obj.offset; - if (is_vdi_obj(oid)) { - struct strbuf buf = STRBUF_INIT; - - strbuf_addf(&buf, "%s%016" PRIx64, obj_path, oid); - jd = jrnl_begin(data, hdr->data_length, hdr->obj.offset, - buf.buf, jrnl_path); - if (!jd) { - strbuf_release(&buf); - return SD_RES_EIO; - } - ret = sd_store->write(oid, iocb, create); - jrnl_end(jd); - strbuf_release(&buf); - } else - ret = sd_store->write(oid, iocb, create); - - return ret; -} - -int peer_write_obj(struct request *req) -{ - struct sd_req *hdr = &req->rq; - uint32_t epoch = hdr->epoch; - struct siocb iocb; - - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = epoch; - iocb.flags = hdr->flags; - return do_write_obj(&iocb, hdr, epoch, req->data, 0); -} - -int peer_create_and_write_obj(struct request *req) -{ - struct sd_req *hdr = &req->rq; - struct sd_req cow_hdr; - uint32_t epoch = hdr->epoch; - uint64_t oid = hdr->obj.oid; - char *buf = NULL; - struct siocb iocb; - unsigned data_length; - int ret = SD_RES_SUCCESS; - - if (is_vdi_obj(oid)) - data_length = SD_INODE_SIZE; - else if (is_vdi_attr_obj(oid)) - data_length = SD_ATTR_OBJ_SIZE; - else - data_length = SD_DATA_OBJ_SIZE; - - memset(&iocb, 0, sizeof(iocb)); - iocb.epoch = epoch; - iocb.flags = hdr->flags; - iocb.length = data_length; - if (hdr->flags & SD_FLAG_CMD_COW) { - dprintf("%" PRIx64 ", %" PRIx64 "\n", oid, hdr->obj.cow_oid); - - buf = valloc(SD_DATA_OBJ_SIZE); - if (!buf) { - eprintf("can not allocate memory\n"); - goto out; - } - if (hdr->data_length != SD_DATA_OBJ_SIZE) { - ret = read_copy_from_replica(req->vnodes, hdr->epoch, - hdr->obj.cow_oid, buf); - if (ret != SD_RES_SUCCESS) { - eprintf("failed to read cow object\n"); - goto out; - } - } - - memcpy(buf + hdr->obj.offset, req->data, hdr->data_length); - memcpy(&cow_hdr, hdr, sizeof(cow_hdr)); - cow_hdr.data_length = SD_DATA_OBJ_SIZE; - cow_hdr.obj.offset = 0; - - ret = do_write_obj(&iocb, &cow_hdr, epoch, buf, 1); - } else - ret = do_write_obj(&iocb, hdr, epoch, req->data, 1); - - if (SD_RES_SUCCESS == ret) - objlist_cache_insert(oid); -out: - if (buf) - free(buf); - return ret; -} - static struct sd_op_template sd_ops[] = { /* cluster operations */ diff --git a/sheep/peer.c b/sheep/peer.c new file mode 100644 index 0000000..d25ee72 --- /dev/null +++ b/sheep/peer.c @@ -0,0 +1,229 @@ +/* + * Copyright (C) 2009-2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "sheep_priv.h" +#include "strbuf.h" +#include "trace/trace.h" + +static int read_copy_from_replica(struct vnode_info *vnodes, uint32_t epoch, + uint64_t oid, char *buf) +{ + int i, j, nr_copies, ret; + struct sd_req hdr; + struct sd_rsp *rsp = (struct sd_rsp *)&hdr; + struct sd_vnode *obj_vnodes[SD_MAX_COPIES]; + struct sd_vnode *v; + char name[128]; + int rounded_rand, local = -1; + + nr_copies = get_nr_copies(vnodes); + oid_to_vnodes(vnodes, oid, nr_copies, obj_vnodes); + + /* first try to read from local copy */ + for (i = 0; i < nr_copies; i++) { + struct siocb iocb; + + v = obj_vnodes[i]; + addr_to_str(name, sizeof(name), v->nid.addr, 0); + + if (vnode_is_local(v)) { + memset(&iocb, 0, sizeof(iocb)); + iocb.epoch = epoch; + iocb.buf = buf; + iocb.length = SD_DATA_OBJ_SIZE; + iocb.offset = 0; + ret = sd_store->read(oid, &iocb); + if (ret != SD_RES_SUCCESS) { + local = i; + break; + } + goto out; + } + } + + /* then read random copy from cluster for better load balance */ + rounded_rand = random() % nr_copies; + + for (i = 0; i < nr_copies; i++) { + unsigned wlen, rlen; + int fd; + + j = (i + rounded_rand) % nr_copies; + + /* bypass the local copy */ + if (local == j) + continue; + + v = obj_vnodes[j]; + addr_to_str(name, sizeof(name), v->nid.addr, 0); + + fd = connect_to(name, v->nid.port); + if (fd < 0) + continue; + + rlen = SD_DATA_OBJ_SIZE; + wlen = 0; + + sd_init_req(&hdr, SD_OP_READ_PEER); + hdr.epoch = epoch; + hdr.data_length = rlen; + + hdr.obj.oid = oid; + hdr.obj.offset = 0; + + ret = exec_req(fd, &hdr, buf, &wlen, &rlen); + + close(fd); + + if (ret) { + dprintf("%x, %x\n", ret, rsp->result); + continue; + } + + if (rsp->result == SD_RES_SUCCESS) + break; + } + + ret = rsp->result; + dprintf("%"PRIx64" ret:%x\n", oid, ret); +out: + return ret; +} +int peer_remove_obj(struct request *req) +{ + uint64_t oid = req->rq.obj.oid; + + objlist_cache_remove(oid); + object_cache_remove(oid); + + return sd_store->remove_object(oid); +} + +int peer_read_obj(struct request *req) +{ + struct sd_req *hdr = &req->rq; + struct sd_rsp *rsp = &req->rp; + int ret; + uint32_t epoch = hdr->epoch; + struct siocb iocb; + + memset(&iocb, 0, sizeof(iocb)); + iocb.epoch = epoch; + iocb.flags = hdr->flags; + iocb.buf = req->data; + iocb.length = hdr->data_length; + iocb.offset = hdr->obj.offset; + ret = sd_store->read(hdr->obj.oid, &iocb); + if (ret != SD_RES_SUCCESS) + goto out; + + rsp->data_length = hdr->data_length; + rsp->obj.copies = sys->nr_copies; +out: + return ret; +} + +static int do_write_obj(struct siocb *iocb, struct sd_req *hdr, uint32_t epoch, + void *data, int create) +{ + uint64_t oid = hdr->obj.oid; + int ret = SD_RES_SUCCESS; + void *jd = NULL; + + iocb->buf = data; + iocb->length = hdr->data_length; + iocb->offset = hdr->obj.offset; + if (is_vdi_obj(oid)) { + struct strbuf buf = STRBUF_INIT; + + strbuf_addf(&buf, "%s%016" PRIx64, obj_path, oid); + jd = jrnl_begin(data, hdr->data_length, hdr->obj.offset, + buf.buf, jrnl_path); + if (!jd) { + strbuf_release(&buf); + return SD_RES_EIO; + } + ret = sd_store->write(oid, iocb, create); + jrnl_end(jd); + strbuf_release(&buf); + } else + ret = sd_store->write(oid, iocb, create); + + return ret; +} + +int peer_write_obj(struct request *req) +{ + struct sd_req *hdr = &req->rq; + uint32_t epoch = hdr->epoch; + struct siocb iocb; + + memset(&iocb, 0, sizeof(iocb)); + iocb.epoch = epoch; + iocb.flags = hdr->flags; + return do_write_obj(&iocb, hdr, epoch, req->data, 0); +} + +int peer_create_and_write_obj(struct request *req) +{ + struct sd_req *hdr = &req->rq; + struct sd_req cow_hdr; + uint32_t epoch = hdr->epoch; + uint64_t oid = hdr->obj.oid; + char *buf = NULL; + struct siocb iocb; + unsigned data_length; + int ret = SD_RES_SUCCESS; + + if (is_vdi_obj(oid)) + data_length = SD_INODE_SIZE; + else if (is_vdi_attr_obj(oid)) + data_length = SD_ATTR_OBJ_SIZE; + else + data_length = SD_DATA_OBJ_SIZE; + + memset(&iocb, 0, sizeof(iocb)); + iocb.epoch = epoch; + iocb.flags = hdr->flags; + iocb.length = data_length; + if (hdr->flags & SD_FLAG_CMD_COW) { + dprintf("%" PRIx64 ", %" PRIx64 "\n", oid, hdr->obj.cow_oid); + + buf = valloc(SD_DATA_OBJ_SIZE); + if (!buf) { + eprintf("can not allocate memory\n"); + goto out; + } + if (hdr->data_length != SD_DATA_OBJ_SIZE) { + ret = read_copy_from_replica(req->vnodes, hdr->epoch, + hdr->obj.cow_oid, buf); + if (ret != SD_RES_SUCCESS) { + eprintf("failed to read cow object\n"); + goto out; + } + } + + memcpy(buf + hdr->obj.offset, req->data, hdr->data_length); + memcpy(&cow_hdr, hdr, sizeof(cow_hdr)); + cow_hdr.data_length = SD_DATA_OBJ_SIZE; + cow_hdr.obj.offset = 0; + + ret = do_write_obj(&iocb, &cow_hdr, epoch, buf, 1); + } else + ret = do_write_obj(&iocb, hdr, epoch, req->data, 1); + + if (SD_RES_SUCCESS == ret) + objlist_cache_insert(oid); +out: + if (buf) + free(buf); + return ret; +} -- 1.7.10.2 -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
