From: Liu Yuan <[email protected]>

Current recovery logic is elusive and it is not easy to be understood.
I hope this work would ease the headache.

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/store.c |  356 ++++++++++++++++++++++++++++++--------------------------
 1 files changed, 191 insertions(+), 165 deletions(-)

diff --git a/sheep/store.c b/sheep/store.c
index fed74f8..409840c 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -1139,81 +1139,80 @@ static int find_tgt_node(struct 
sheepdog_vnode_list_entry *old_entry,
        return -1;
 }
 
-static int __recover_one(struct recovery_work *rw,
-                        struct sheepdog_vnode_list_entry *_old_entry,
-                        int old_nr, int old_copies,
-                        struct sheepdog_vnode_list_entry *_cur_entry,
-                        int cur_nr, int cur_copies, int cur_idx,
-                        int copy_idx, uint32_t epoch, uint32_t tgt_epoch,
-                        uint64_t oid, char *buf, int buf_len)
+static void *alloc_buffer_for(uint64_t oid)
+{
+       void *buf = NULL;
+
+       if (is_vdi_obj(oid))
+               buf = xmalloc(sizeof(struct sheepdog_inode));
+       else if (is_vdi_attr_obj(oid))
+               buf = xmalloc(SD_MAX_VDI_ATTR_VALUE_LEN);
+       else if (is_data_obj(oid))
+               buf = valloc(SD_DATA_OBJ_SIZE);
+       else
+               buf = xmalloc(SD_DATA_OBJ_SIZE);
+
+       return buf;
+}
+
+static void *get_vnodes_from_epoch(int epoch, int *nr)
+{
+       int nodes_nr, len = sizeof(struct sheepdog_vnode_list_entry) * 
SD_MAX_VNODES;
+       struct sheepdog_node_list_entry nodes[SD_MAX_NODES];
+       void *buf = xmalloc(len);
+
+       nodes_nr = epoch_log_read_nr(epoch, (void *)nodes, ARRAY_SIZE(nodes));
+       if (nodes_nr < 0) {
+               nodes_nr = epoch_log_read_remote(epoch, (void *)nodes, 
ARRAY_SIZE(nodes));
+               if (nodes_nr == 0) {
+                       free(buf);
+                       return NULL;
+               }
+               nodes_nr /= sizeof(nodes[0]);
+       }
+       *nr = nodes_to_vnodes(nodes, nodes_nr, buf);
+
+       return buf;
+}
+
+static int recover_object_from_replica(uint64_t oid,
+               struct sheepdog_vnode_list_entry *entry,
+               int epoch, int tgt_epoch)
 {
-       struct sheepdog_vnode_list_entry *e;
        struct sd_obj_req hdr;
        struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
        char name[128];
        unsigned wlen = 0, rlen;
        int fd, ret;
-       struct sheepdog_vnode_list_entry *old_entry, *cur_entry, *next_entry;
-       int next_nr, next_copies;
-       int tgt_idx = -1;
-       int old_idx;
-
-       old_entry = malloc(sizeof(*old_entry) * SD_MAX_VNODES);
-       cur_entry = malloc(sizeof(*cur_entry) * SD_MAX_VNODES);
-       next_entry = malloc(sizeof(*next_entry) * SD_MAX_VNODES);
-       if (!old_entry || !cur_entry || !next_entry) {
-               eprintf("failed to allocate memory\n");
-               goto err;
-       }
-
-       memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr);
-       memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr);
-next:
-       dprintf("recover object %"PRIx64" from epoch %"PRIu32"\n", oid, 
tgt_epoch);
-       old_idx = obj_to_sheep(old_entry, old_nr, oid, 0);
+       void *buf;
 
-       tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, old_copies,
-                               cur_entry, cur_nr, cur_idx, cur_copies, 
copy_idx);
-       if (tgt_idx < 0) {
-               eprintf("cannot find target node %"PRIx64"\n", oid);
-               goto err;
+       buf = alloc_buffer_for(oid);
+       if (!buf) {
+               eprintf("out of memory\n");
+               return -1;
        }
-       e = old_entry + tgt_idx;
 
-       if (is_myself(e->addr, e->port)) {
+       if (is_myself(entry->addr, entry->port)) {
                struct siocb iocb = { 0 };
 
                iocb.epoch = epoch;
                ret = store.link(oid, &iocb, tgt_epoch);
-               if (ret == SD_RES_SUCCESS)
+               if (ret == SD_RES_SUCCESS) {
+                       ret = 0;
+                       goto done;
+               } else {
+                       ret = -1;
                        goto out;
-
-               if (ret == SD_RES_NO_OBJ) {
-                       next_nr = epoch_log_read(tgt_epoch - 1, buf, buf_len);
-                       if (next_nr <= 0) {
-                               eprintf("no previous epoch: %"PRIu32"\n", 
tgt_epoch - 1);
-                               goto err;
-                       }
-                       next_nr /= sizeof(struct sheepdog_node_list_entry);
-                       next_copies = get_max_copies((struct 
sheepdog_node_list_entry *)buf,
-                                                    next_nr);
-                       next_nr = nodes_to_vnodes((struct 
sheepdog_node_list_entry *)buf,
-                                                 next_nr, next_entry);
-                       goto not_found;
                }
-
-               eprintf("Cannot recover from local store for %"PRIx64"\n", oid);
-               goto err;
        }
 
-       addr_to_str(name, sizeof(name), e->addr, 0);
-
-       fd = connect_to(name, e->port);
+       addr_to_str(name, sizeof(name), entry->addr, 0);
+       fd = connect_to(name, entry->port);
        if (fd < 0) {
-               eprintf("failed to connect to %s:%"PRIu32"\n", name, e->port);
-               goto err;
+               eprintf("failed to connect to %s:%"PRIu32"\n", name, 
entry->port);
+               ret = -1;
+               goto out;
        }
-
        if (is_vdi_obj(oid))
                rlen = sizeof(struct sheepdog_inode);
        else if (is_vdi_attr_obj(oid))
@@ -1234,8 +1233,9 @@ next:
        close(fd);
 
        if (ret != 0) {
-               eprintf("%"PRIu32"\n", rsp->result);
-               goto err;
+               eprintf("res: %"PRIx32"\n", rsp->result);
+               ret = -1;
+               goto out;
        }
 
        rsp = (struct sd_obj_rsp *)&hdr;
@@ -1245,20 +1245,22 @@ next:
                int flags = O_DSYNC | O_RDWR | O_CREAT;
 
                snprintf(path, sizeof(path), "%s%08u/%016" PRIx64, obj_path,
-                        epoch, oid);
+                               epoch, oid);
                snprintf(tmp_path, sizeof(tmp_path), "%s%08u/%016" PRIx64 
".tmp",
-                        obj_path, epoch, oid);
+                               obj_path, epoch, oid);
 
                fd = open(tmp_path, flags, def_fmode);
                if (fd < 0) {
                        eprintf("failed to open %s: %m\n", tmp_path);
-                       goto err;
+                       ret = -1;
+                       goto out;
                }
 
                ret = write(fd, buf, rlen);
                if (ret != rlen) {
                        eprintf("failed to write object\n");
-                       goto err;
+                       ret = -1;
+                       goto out;
                }
 
                close(fd);
@@ -1267,137 +1269,161 @@ next:
                ret = rename(tmp_path, path);
                if (ret < 0) {
                        eprintf("failed to rename %s to %s: %m\n", tmp_path, 
path);
-                       goto err;
+                       ret = -1;
+                       goto out;
                }
-               dprintf("recovered oid %"PRIx64" to epoch %"PRIu32"\n", oid, 
epoch);
+       } else if (rsp->result == SD_RES_NEW_NODE_VER ||
+                       rsp->result == SD_RES_OLD_NODE_VER ||
+                       rsp->result == SD_RES_NETWORK_ERROR) {
+               dprintf("retrying: %"PRIx32", %"PRIx64"\n", rsp->result, oid);
+               ret = 1;
                goto out;
-       }
-
-       if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == 
SD_RES_OLD_NODE_VER
-           || rsp->result == SD_RES_NETWORK_ERROR) {
-               eprintf("retrying: %"PRIu32", %"PRIx64"\n", rsp->result, oid);
-               rw->retry = 1;
+       } else {
+               eprintf("failed, res: %"PRIx32"\n", rsp->result);
+               ret = -1;
                goto out;
        }
+done:
+       dprintf("recovered oid %"PRIx64" from %d to epoch %d\n", oid, 
tgt_epoch, epoch);
+out:
+       free(buf);
+       return ret;
+}
 
-       if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) {
-               eprintf("%"PRIu32"\n", rsp->result);
-               goto err;
-       }
-       next_nr = rsp->data_length / sizeof(struct sheepdog_node_list_entry);
-       next_copies = get_max_copies((struct sheepdog_node_list_entry *)buf, 
next_nr);
-       next_nr = nodes_to_vnodes((struct sheepdog_node_list_entry *)buf,
-                                 next_nr, next_entry);
+static void rollback_old_cur(struct sheepdog_vnode_list_entry *old, int 
*old_nr,
+               struct sheepdog_vnode_list_entry *cur, int *cur_nr,
+               struct sheepdog_vnode_list_entry *new_old, int new_old_nr)
+{
+       int nr_old = *old_nr;
 
-not_found:
-       for (copy_idx = 0; copy_idx < old_copies; copy_idx++)
-               if (get_nth_node(old_entry, old_nr, old_idx, copy_idx) == 
tgt_idx)
-                       break;
-       if (copy_idx == old_copies) {
-               eprintf("bug: cannot find the proper copy_idx\n");
+       memcpy(cur, old, sizeof(*old) * nr_old);
+       *cur_nr = nr_old;
+       memcpy(old, new_old, sizeof(*new_old) * new_old_nr);
+       *old_nr = new_old_nr;
+}
+
+/*
+ * Recover the object from its track in epoch history. That is,
+ * the routine will try to recovery it from the nodes it has stayed,
+ * at least, *theoretically* on consistent hash ring.
+ */
+static int do_recover_object(struct recovery_work *rw, int copy_idx)
+{
+       struct sheepdog_vnode_list_entry *old, *cur;
+       uint64_t oid = rw->oids[rw->done];
+       int old_nr = rw->old_nr_vnodes, cur_nr = rw->cur_nr_vnodes;
+       int epoch = rw->epoch, tgt_epoch = rw->epoch - 1;
+       struct sheepdog_vnode_list_entry *tgt_entry;
+       int old_idx, cur_idx, tgt_idx, old_copies, cur_copies, ret;
+
+       old = xmalloc(sizeof(*old) * SD_MAX_VNODES);
+       cur = xmalloc(sizeof(*cur) * SD_MAX_VNODES);
+
+       memcpy(old, rw->old_vnodes, sizeof(*old) * old_nr);
+       memcpy(cur, rw->cur_vnodes, sizeof(*cur) * cur_nr);
+
+again:
+       old_idx = obj_to_sheep(old, old_nr, oid, 0);
+       cur_idx = obj_to_sheep(cur, cur_nr, oid, 0);
+       old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes);
+       cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
+
+       dprintf("try recover object %"PRIx64" from epoch %"PRIu32"\n", oid, 
tgt_epoch);
+
+       tgt_idx = find_tgt_node(old, old_nr, old_idx, old_copies,
+                       cur, cur_nr, cur_idx, cur_copies, copy_idx);
+       if (tgt_idx < 0) {
+               eprintf("cannot find target node %"PRIx64"\n", oid);
+               ret = -1;
                goto err;
        }
+       tgt_entry = old + tgt_idx;
 
-       dprintf("%"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", %"PRIu32", 
%"PRIu32"\n", rsp->result, rsp->data_length, tgt_idx,
-               old_idx, old_nr, copy_idx);
-       memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr);
-       cur_copies = old_copies;
-       cur_nr = old_nr;
-       cur_idx = old_idx;
+       ret = recover_object_from_replica(oid, tgt_entry, epoch, tgt_epoch);
+       if (ret < 0) {
+               struct sheepdog_vnode_list_entry *new_old;
+               int new_old_nr;
 
-       memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry));
-       old_copies = next_copies;
-       old_nr = next_nr;
+               tgt_epoch--;
+               if (tgt_epoch < 1) {
+                       eprintf("can not recover oid %"PRIx64"\n", oid);
+                       ret = -1;
+                       goto err;
+               }
 
-       tgt_epoch--;
-       goto next;
-out:
-       free(old_entry);
-       free(cur_entry);
-       free(next_entry);
-       return 0;
+               new_old = get_vnodes_from_epoch(tgt_epoch, &new_old_nr);
+               if (!new_old) {
+                       ret = -1;
+                       goto err;
+               }
+               rollback_old_cur(old, &old_nr, cur, &cur_nr,
+                               new_old, new_old_nr);
+               free(new_old);
+               goto again;
+       } else if (ret > 0) {
+               ret = 0;
+               rw->retry = 1;
+       }
 err:
-       free(old_entry);
-       free(cur_entry);
-       free(next_entry);
-       return -1;
+       free(old);
+       free(cur);
+       return ret;
 }
 
-static void recover_one(struct work *work, int idx)
+static int get_replica_idx(struct recovery_work *rw, uint64_t oid, int 
*copy_nr)
+{
+       int i, ret = -1;
+       *copy_nr = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
+       for (i = 0; i < *copy_nr; i++) {
+               int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
+               if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) {
+                       ret = i;
+                       break;
+               }
+       }
+       return ret;
+}
+
+static void recover_object(struct work *work, int idx)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
-       char *buf = NULL;
-       int ret;
        uint64_t oid = rw->oids[rw->done];
-       int old_copies, cur_copies;
        uint32_t epoch = rw->epoch;
-       int i, copy_idx = 0, cur_idx = -1;
-       struct siocb iocb;
+       int i, copy_idx, copy_nr, ret;
+       struct siocb iocb = { 0 };
 
-       eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
+       if (!sys->nr_sobjs)
+               return;
+
+       eprintf("done:%"PRIu32" count:%"PRIu32", oid:%"PRIx64"\n", rw->done, 
rw->count, oid);
 
        memset(&iocb, 0, sizeof(iocb));
        iocb.epoch = epoch;
        ret = store.open(oid, &iocb, 0);
        if (ret == SD_RES_SUCCESS) {
-               /* the object is already recovered */
                store.close(oid, &iocb);
-               goto out;
+               dprintf("the object is already recovered\n");
+               return;
        }
 
-       if (is_vdi_obj(oid))
-               buf = malloc(sizeof(struct sheepdog_inode));
-       else if (is_vdi_attr_obj(oid))
-               buf = malloc(SD_MAX_VDI_ATTR_VALUE_LEN);
-       else if (is_data_obj(oid))
-               buf = valloc(SD_DATA_OBJ_SIZE);
-       else
-               buf = malloc(SD_DATA_OBJ_SIZE);
-
-       if (!sys->nr_sobjs)
-               goto fail;
-
-       cur_idx = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, 0);
-
-       old_copies = get_max_copies(rw->old_nodes, rw->old_nr_nodes);
-       cur_copies = get_max_copies(rw->cur_nodes, rw->cur_nr_nodes);
-
-       copy_idx = -1;
-       for (i = 0; i < cur_copies; i++) {
-               int n = obj_to_sheep(rw->cur_vnodes, rw->cur_nr_vnodes, oid, i);
-               if (is_myself(rw->cur_vnodes[n].addr, rw->cur_vnodes[n].port)) {
-                       copy_idx = i;
-                       break;
-               }
-       }
-       if (copy_idx < 0) {
-               eprintf("bug: copy_idx < 0\n");
-               goto out;
+       copy_idx = get_replica_idx(rw, oid, &copy_nr);
+       if (idx < 0) {
+               ret = -1;
+               goto err;
        }
-
-       dprintf("%"PRIu32", %"PRIu32", %"PRIu32"\n", cur_idx, rw->cur_nr_nodes,
-               copy_idx);
-
-       ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, old_copies,
-                           rw->cur_vnodes, rw->cur_nr_vnodes, cur_copies,
-                           cur_idx, copy_idx, epoch, epoch - 1, oid,
-                           buf, SD_DATA_OBJ_SIZE);
-       if (ret == 0)
-               goto out;
-
-       for (i = 0; i < cur_copies; i++) {
-               if (i == copy_idx)
-                       continue;
-               ret = __recover_one(rw, rw->old_vnodes, rw->old_nr_vnodes, 
old_copies,
-                                   rw->cur_vnodes, rw->cur_nr_vnodes, 
cur_copies, cur_idx, i,
-                                   epoch, epoch - 1, oid, buf, 
SD_DATA_OBJ_SIZE);
-               if (ret == 0)
-                       goto out;
+       ret = do_recover_object(rw, copy_idx);
+       if (ret < 0) {
+               for (i = 0; i < copy_nr; i++) {
+                       if (i == copy_idx)
+                               continue;
+                       ret = do_recover_object(rw, i);
+                       if (ret == 0)
+                               break;
+               }
        }
-fail:
-       eprintf("failed to recover object %"PRIx64"\n", oid);
-out:
-       free(buf);
+err:
+       if (ret < 0)
+               eprintf("failed to recover object %"PRIx64"\n", oid);
 }
 
 static struct recovery_work *suspended_recovery_work;
@@ -1520,7 +1546,7 @@ static void recover_done(struct work *work, int idx)
        }
 
        if (rw->done < rw->count && !next_rw) {
-               rw->work.fn = recover_one;
+               rw->work.fn = recover_object;
 
                if (is_access_to_busy_objects(oid)) {
                        suspended_recovery_work = rw;
-- 
1.7.8.rc3

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to