Currently, sheepdog cannot handle node failure when recovery is invoked,
so double node failure at once leads to system down.

This patch supports it.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 collie/collie.h          |    2 +-
 collie/group.c           |   10 +-
 collie/store.c           |  684 +++++++++++++++++++++++++++++++---------------
 include/sheepdog_proto.h |   44 +++-
 4 files changed, 502 insertions(+), 238 deletions(-)

diff --git a/collie/collie.h b/collie/collie.h
index 3b17ee7..fac6809 100644
--- a/collie/collie.h
+++ b/collie/collie.h
@@ -122,7 +122,7 @@ int remove_epoch(int epoch);
 int set_cluster_ctime(uint64_t ctime);
 uint64_t get_cluster_ctime(void);
 
-int start_recovery(uint32_t epoch, int add);
+int start_recovery(uint32_t epoch);
 
 static inline int is_myself(struct sheepdog_node_list_entry *e)
 {
diff --git a/collie/group.c b/collie/group.c
index a2d7425..d4b5449 100644
--- a/collie/group.c
+++ b/collie/group.c
@@ -768,17 +768,15 @@ static void __sd_deliver(struct work *work, int idx)
 static void __sd_deliver_done(struct work *work, int idx)
 {
        struct work_deliver *w = container_of(work, struct work_deliver, work);
-/*     struct message_header *m = w->msg; */
-/*     struct cluster_info *ci = w->ci; */
+       struct message_header *m = w->msg;
 
        /*
         * FIXME: we want to recover only after all nodes are fully
         * synchronized
         */
 
-       /* disabled for now */
-/*     if (m->done && m->op == SD_MSG_JOIN) */
-/*             start_recovery(ci, ci->epoch, 1); */
+       if (m->done && m->op == SD_MSG_JOIN && sys->epoch >= 2)
+               start_recovery(sys->epoch);
 
        free(w->msg);
        free(w);
@@ -904,7 +902,7 @@ static void __sd_confch_done(struct work *work, int idx)
        if (w->left_list_entries) {
                if (w->left_list_entries > 1)
                        eprintf("we can't handle %Zd\n", w->left_list_entries);
-               start_recovery(sys->epoch, 0);
+               start_recovery(sys->epoch);
        }
 
        free(w->member_list);
diff --git a/collie/store.c b/collie/store.c
index 9b5584f..b35bbf5 100644
--- a/collie/store.c
+++ b/collie/store.c
@@ -76,23 +76,66 @@ static int stat_sheep(uint64_t *store_size, uint64_t 
*store_free, uint32_t epoch
        return SD_RES_SUCCESS;
 }
 
-static int get_obj_list(struct request *req)
+static int is_obj_in_range(uint64_t oid, uint64_t start, uint64_t end)
+{
+       uint64_t hval = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+       if (start < end)
+               return (start < hval && hval <= end);
+       else
+               return (start < hval || hval <= end);
+}
+
+static int get_obj_list(struct request *req, char *buf, int buf_len)
 {
        DIR *dir;
        struct dirent *d;
-       struct sd_obj_req *hdr = (struct sd_obj_req *)&req->rq;
-       struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
+       struct sd_list_req *hdr = (struct sd_list_req *)&req->rq;
+       struct sd_list_rsp *rsp = (struct sd_list_rsp *)&req->rp;
        uint64_t oid;
-       uint64_t oid_hash;
-       uint64_t start_hash = hdr->oid;
-       uint64_t end_hash = hdr->cow_oid;
+       uint64_t start_hash = hdr->start;
+       uint64_t end_hash = hdr->end;
+       uint32_t epoch = hdr->tgt_epoch;
        char path[1024];
        uint64_t *p = (uint64_t *)req->data;
        int nr = 0;
+       uint64_t *objlist = NULL;
+       int obj_nr = 0, fd, i;
+       struct sheepdog_node_list_entry *e;
+       int e_nr;
+       int idx;
+       int res = SD_RES_SUCCESS;
+
+       if (epoch == 1)
+               goto local;
+
+       snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+
+       fd = open(path, O_RDONLY);
+       if (fd < 0) {
+               eprintf("failed to open %s, %s\n", path, strerror(errno));
+               close(fd);
+               return SD_RES_EIO;
+       }
+       obj_nr = read(fd, buf, buf_len);
+       obj_nr /= sizeof(uint64_t);
+       objlist = (uint64_t *)buf;
+       for (i = 0; i < obj_nr; i++) {
+               if (is_obj_in_range(objlist[i], start_hash, end_hash)) {
+                       dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+                               objlist[i], start_hash, end_hash);
+                       p[nr++] = objlist[i];
+               }
 
-       snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->obj_ver);
+               if (nr * sizeof(uint64_t) >= hdr->data_length)
+                       break;
+       }
+       close(fd);
 
-       dprintf("%d\n", sys->this_node.port);
+local:
+       snprintf(path, sizeof(path), "%s%08u/", obj_path, hdr->tgt_epoch);
+
+       dprintf("%d, %s\n", sys->this_node.port, path);
 
        dir = opendir(path);
        if (!dir) {
@@ -101,38 +144,66 @@ static int get_obj_list(struct request *req)
        }
 
        while ((d = readdir(dir))) {
-               int got = 0;
-
                if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
                        continue;
 
                oid = strtoull(d->d_name, NULL, 16);
-               oid_hash = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+               if (oid == 0)
+                       continue;
+
+               for (i = 0; i < obj_nr; i++)
+                       if (objlist[i] == oid)
+                               break;
+               if (i < obj_nr)
+                       continue;
 
-               if ((nr + 1) * sizeof(uint64_t) > hdr->data_length)
+               if (is_obj_in_range(oid, start_hash, end_hash)) {
+                       dprintf("%u, %016lx, %016lx %016lx\n", epoch,
+                               oid, start_hash, end_hash);
+                       p[nr++] = oid;
+               }
+
+               if (nr * sizeof(uint64_t) >= hdr->data_length)
                        break;
+       }
 
-               if (start_hash < end_hash) {
-                       if (oid_hash >= start_hash && oid_hash < end_hash)
-                               got = 1;
-               } else
-                       if (end_hash <= oid_hash || oid_hash < start_hash)
-                               got = 1;
+       eprintf("nr = %d\n", nr);
+       rsp->data_length = nr * sizeof(uint64_t);
 
-               dprintf("%d, %u, %016lx, %016lx, %016lx %016lx\n", got, 
hdr->obj_ver,
-                       oid, oid_hash, start_hash, end_hash);
+       e_nr = epoch_log_read(epoch, buf, buf_len);
+       e_nr /= sizeof(*e);
+       e = (struct sheepdog_node_list_entry *)buf;
 
-               if (got) {
-                       *(p + nr) = oid;
-                       nr++;
-               }
+       if (e_nr <= sys->nr_sobjs) {
+               rsp->next = end_hash;
+               goto out;
        }
 
-       rsp->data_length = nr * 8;
+       for (idx = 0; idx < e_nr; idx++) {
+               if (e[idx].id == sys->this_node.id)
+                       break;
+       }
+       if (idx != e_nr) {
+               uint64_t hval = e[idx % e_nr].id;
 
+               rsp->next = end_hash;
+
+               if (start_hash < end_hash) {
+                       if (start_hash < hval && hval <= end_hash)
+                               rsp->next = hval;
+               } else
+                       if (start_hash < hval || hval <= end_hash)
+                               rsp->next = hval;
+
+               dprintf("%u, %016lx, %016lx %016lx\n", epoch, hval,
+                       start_hash, end_hash);
+       } else
+               res = SD_RES_SYSTEM_ERROR;
+
+out:
        closedir(dir);
 
-       return SD_RES_SUCCESS;
+       return res;
 }
 
 static int read_from_one(uint64_t oid,
@@ -225,6 +296,7 @@ static int forward_obj_req(struct request *req, char *buf)
        struct sd_rsp *rsp = (struct sd_rsp *)&hdr2;
        uint64_t oid = hdr->oid;
        int copies;
+       uint32_t epoch;
 
        e = zalloc(SD_MAX_NODES * sizeof(struct sheepdog_node_list_entry));
 again:
@@ -243,7 +315,11 @@ again:
 
                /* TODO: we can do better; we need to chech this first */
                if (is_myself(&e[n])) {
-                       ret = store_queue_request_local(req, buf, sys->epoch);
+                       if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+                               epoch = hdr->tgt_epoch;
+                       else
+                               epoch = sys->epoch;
+                       ret = store_queue_request_local(req, buf, epoch);
                        memcpy(rsp, &req->rp, sizeof(*rsp));
                        rsp->result = ret;
                        goto done;
@@ -334,63 +410,13 @@ static int ob_open(uint32_t epoch, uint64_t oid, int 
aflags, int *ret)
        return fd;
 }
 
-static int is_my_obj(uint64_t oid, int copies)
-{
-       int i, n, nr;
-       struct sheepdog_node_list_entry e[SD_MAX_NODES];
-
-       nr = build_node_list(&sys->sd_node_list, e);
-
-       for (i = 0; i < copies; i++) {
-               n = obj_to_sheep(e, nr, oid, i);
-               if (is_myself(&e[n]))
-                       return 1;
-       }
-
-       return 0;
-}
-
 int update_epoch_store(uint32_t epoch)
 {
-       int ret;
-       char new[1024], old[1024];
-       struct stat s;
-       DIR *dir;
-       struct dirent *d;
-       uint64_t oid;
+       char new[1024];
 
        snprintf(new, sizeof(new), "%s%08u/", obj_path, epoch);
        mkdir(new, def_dmode);
 
-       snprintf(old, sizeof(old), "%s%08u/", obj_path, epoch - 1);
-
-       ret = stat(old, &s);
-       if (ret)
-               return 0;
-
-       dir = opendir(old);
-       if (!dir) {
-               eprintf("%s, %s, %m\n", old, new);
-               return 1;
-       }
-
-       while ((d = readdir(dir))) {
-               if (!strcmp(d->d_name, ".") || !strcmp(d->d_name, ".."))
-                       continue;
-
-               oid = strtoull(d->d_name, NULL, 16);
-               /* TODO: use proper object coipes */
-               if (is_my_obj(oid, sys->nr_sobjs)) {
-                       snprintf(new, sizeof(new), "%s%08u/%s", obj_path, epoch,
-                               d->d_name);
-                       snprintf(old, sizeof(old), "%s%08u/%s", obj_path, epoch 
- 1,
-                               d->d_name);
-                       link(old, new);
-               }
-       }
-
-       closedir(dir);
-
        return 0;
 }
 
@@ -517,6 +543,13 @@ out:
        if (fd != -1)
                close(fd);
 
+       if (ret == SD_RES_NO_OBJ && hdr->flags & SD_FLAG_CMD_RECOVERY) {
+               int len  = epoch_log_read(epoch - 1, req->data, 
hdr->data_length);
+               if (len < 0)
+                       len = 0;
+               rsp->data_length = len;
+       }
+
        return ret;
 }
 
@@ -547,13 +580,16 @@ void store_queue_request(struct work *work, int idx)
                        goto out;
        }
 
+       if (hdr->flags & SD_FLAG_CMD_RECOVERY)
+               epoch = hdr->tgt_epoch;
+
        if (opcode == SD_OP_STAT_SHEEP) {
                ret = stat_sheep(&nrsp->store_size, &nrsp->store_free, epoch);
                goto out;
        }
 
        if (opcode == SD_OP_GET_OBJ_LIST) {
-               ret = get_obj_list(req);
+               ret = get_obj_list(req, buf, SD_DATA_OBJ_SIZE);
                goto out;
        }
 
@@ -731,19 +767,26 @@ static int node_distance(int my, int her, int nr)
        return (my + nr - her) % nr;
 }
 
-static int node_from_distance(int my, int dist, int nr)
+static int contains_node(uint64_t id, struct sheepdog_node_list_entry *entry,
+                        int nr, int base_idx)
 {
-       return (my + nr - dist) % nr;
+       int i;
+
+       for (i = 0; i < sys->nr_sobjs; i++) {
+               if (entry[(base_idx + i) % nr].id == id)
+                       return (base_idx + i) % nr;
+       }
+       return -1;
 }
 
 struct recovery_work {
        uint32_t epoch;
        uint32_t done;
 
-       uint32_t iteration;
-
        struct sheepdog_node_list_entry e;
 
+       struct timer timer;
+       int retry;
        struct work work;
        struct list_head rw_siblings;
 
@@ -754,84 +797,272 @@ struct recovery_work {
 static LIST_HEAD(recovery_work_list);
 static int recovering;
 
-static void recover_one(struct work *work, int idx)
+static int find_tgt_node(struct sheepdog_node_list_entry *old_entry, int 
old_nr, int old_idx,
+                        struct sheepdog_node_list_entry *cur_entry, int 
cur_nr, int cur_idx,
+                        int copy_idx)
 {
-       struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
-       struct sheepdog_node_list_entry *e = &rw->e;
+       int i, idx;
+
+       dprintf("%d, %d, %d, %d, %d\n", old_idx, old_nr, cur_idx, cur_nr, 
copy_idx);
+
+       if (copy_idx < cur_nr) {
+               idx = contains_node(cur_entry[(cur_idx + copy_idx) % cur_nr].id,
+                                   old_entry, old_nr, old_idx);
+               if (idx >= 0) {
+                       dprintf("%d, %d, %d, %d\n", idx, copy_idx, cur_idx, 
cur_nr);
+                       return idx;
+               }
+       }
+
+       for (i = 0; ; i++) {
+               if (i < cur_nr) {
+                       idx = contains_node(cur_entry[(cur_idx + i) % 
cur_nr].id,
+                                           old_entry, old_nr, old_idx);
+                       if (idx >= 0)
+                               continue;
+
+                       while (contains_node(old_entry[old_idx].id, cur_entry, 
cur_nr, cur_idx) >= 0)
+                               old_idx = (old_idx + 1) % old_nr;
+
+               }
+               if (i == copy_idx) {
+                       dprintf("%d, %d, %d, %d\n", old_idx, copy_idx, cur_idx, 
cur_nr);
+                       return old_idx;
+               }
+
+               old_idx = (old_idx + 1) % old_nr;
+       }
+       return -1;
+}
+
+static int __recover_one(struct recovery_work *rw,
+                        struct sheepdog_node_list_entry *_old_entry, int 
old_nr,
+                        struct sheepdog_node_list_entry *_cur_entry, int 
cur_nr, int cur_idx,
+                        int copy_idx, uint32_t epoch, uint32_t tgt_epoch,
+                        uint64_t oid, char *buf, int buf_len)
+{
+       struct sheepdog_node_list_entry *e;
        struct sd_obj_req hdr;
-       struct sd_obj_rsp *rsp;
+       struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&hdr;
        char name[128];
-       char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
        unsigned wlen = 0, rlen = SD_DATA_OBJ_SIZE;
        int fd, ret;
-       uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+       struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+               cur_entry[SD_MAX_NODES], *next_entry;
+       int next_nr;
+       int tgt_idx = -1;
+       int old_idx;
+
+       memcpy(old_entry, _old_entry, sizeof(*old_entry) * old_nr);
+       memcpy(cur_entry, _cur_entry, sizeof(*cur_entry) * cur_nr);
+next:
+       dprintf("recover obj %lx from epoch %d\n", oid, tgt_epoch);
+       old_idx = obj_to_sheep(old_entry, old_nr, oid, 0);
+
+       tgt_idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, cur_nr, 
cur_idx, copy_idx);
+       if (tgt_idx < 0) {
+               eprintf("cannot find target node, %lx\n", oid);
+               return -1;
+       }
+       e = old_entry + tgt_idx;
 
-       eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+       if (e->id == sys->this_node.id) {
+               char old[PATH_MAX], new[PATH_MAX];
+
+               snprintf(old, sizeof(old), "%s%08u/%016" PRIx64, obj_path,
+                        tgt_epoch, oid);
+               snprintf(new, sizeof(new), "%s%08u/%016" PRIx64, obj_path,
+                        epoch, oid);
+               dprintf("link from %s to %s\n", old, new);
+               if (link(old, new) == 0)
+                       return 0;
+
+               if (errno == ENOENT) {
+                       next_nr = epoch_log_read(tgt_epoch, buf, buf_len);
+                       if (next_nr <= 0) {
+                               eprintf("no previous epoch, %d\n", tgt_epoch);
+                               return -1;
+                       }
+                       next_entry = (struct sheepdog_node_list_entry *)buf;
+                       next_nr /= sizeof(*next_entry);
+                       goto not_found;
+               }
+
+               eprintf("cannot recover from local, %s, %s\n", old, new);
+               return -1;
+       }
 
        addr_to_str(name, sizeof(name), e->addr, 0);
 
        fd = connect_to(name, e->port);
        if (fd < 0) {
-               eprintf("%s %d\n", name, e->port);
-               return;
+               eprintf("failed to connect to %s:%d\n", name, e->port);
+               return -1;
        }
 
        memset(&hdr, 0, sizeof(hdr));
        hdr.opcode = SD_OP_READ_OBJ;
        hdr.oid = oid;
        hdr.epoch = sys->epoch;
-       hdr.flags = 0;
+       hdr.flags = SD_FLAG_CMD_RECOVERY | SD_FLAG_CMD_FORWARD;
+       hdr.tgt_epoch = tgt_epoch;
        hdr.data_length = rlen;
 
        ret = exec_req(fd, (struct sd_req *)&hdr, buf, &wlen, &rlen);
 
        close(fd);
 
+       if (ret < 0) {
+               eprintf("%d\n", rsp->result);
+               return -1;
+       }
+
        rsp = (struct sd_obj_rsp *)&hdr;
 
-       if (rsp->result != SD_RES_SUCCESS) {
+       if (rsp->result == SD_RES_SUCCESS) {
+               fd = ob_open(epoch, oid, O_CREAT, &ret);
+               ret = write(fd, buf, SD_DATA_OBJ_SIZE);
+               if (ret != SD_DATA_OBJ_SIZE) {
+                       eprintf("failed to write object\n");
+                       return -1;
+               }
+
+               ret = fsetxattr(fd, ANAME_COPIES, &rsp->copies,
+                               sizeof(rsp->copies), 0);
+               if (ret) {
+                       eprintf("couldn't set xattr\n");
+                       return -1;
+               }
+
+               close(fd);
+               dprintf("recovered oid %lx to epoch %d\n", oid, epoch);
+               return 0;
+       }
+
+       if (rsp->result == SD_RES_NEW_NODE_VER || rsp->result == 
SD_RES_OLD_NODE_VER) {
+               eprintf("try again, %d, %lx\n", rsp->result, oid);
+               rw->retry = 1;
+               return 0;
+       }
+
+       if (rsp->result != SD_RES_NO_OBJ || rsp->data_length == 0) {
                eprintf("%d\n", rsp->result);
+               return -1;
+       }
+       next_entry = (struct sheepdog_node_list_entry *)buf;
+       next_nr = rsp->data_length / sizeof(*old_entry);
+
+not_found:
+       copy_idx = node_distance(tgt_idx, old_idx, old_nr);
+       dprintf("%d, %d, %d, %d, %d, %d\n", rsp->result, rsp->data_length, 
tgt_idx,
+               old_idx, old_nr, copy_idx);
+
+       memcpy(cur_entry, old_entry, sizeof(*old_entry) * old_nr);
+       cur_nr = old_nr;
+       cur_idx = old_idx;
+
+       memcpy(old_entry, next_entry, next_nr * sizeof(*next_entry));
+       old_nr = next_nr;
+
+       tgt_epoch--;
+       goto next;
+}
+
+static void recover_one(struct work *work, int idx)
+{
+       struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
+       char *buf = zero_block + idx * SD_DATA_OBJ_SIZE;
+       int ret;
+       uint64_t oid = *(((uint64_t *)rw->buf) + rw->done);
+       struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
+               cur_entry[SD_MAX_NODES];
+       int old_nr, cur_nr;
+       uint32_t epoch = rw->epoch;
+       int i, my_idx = -1, copy_idx, cur_idx = -1;
+
+       eprintf("%d %d, %16lx\n", rw->done, rw->count, oid);
+
+       cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
+       if (cur_nr <= 0) {
+               eprintf("failed to read current epoch, %d\n", epoch);
                return;
        }
+       cur_nr /= sizeof(struct sheepdog_node_list_entry);
+
+       old_nr = epoch_log_read(epoch - 1, (char *)old_entry, 
sizeof(old_entry));
+       if (old_nr <= 0) {
+               eprintf("failed to read previous epoch, %d\n", epoch - 1);
+               goto fail;
+       }
+       old_nr /= sizeof(struct sheepdog_node_list_entry);
 
-       fd = ob_open(rw->epoch, oid, O_CREAT, &ret);
-       write(fd, buf, SD_DATA_OBJ_SIZE);
+       if (!sys->nr_sobjs)
+               goto fail;
+
+       cur_idx = obj_to_sheep(cur_entry, cur_nr, oid, 0);
+
+       for (i = 0; i < cur_nr; i++) {
+               if (cur_entry[i].id == sys->this_node.id) {
+                       my_idx = i;
+                       break;
+               }
+       }
+       copy_idx = node_distance(my_idx, cur_idx, cur_nr);
+       dprintf("%d, %d, %d, %d\n", my_idx, cur_idx, cur_nr, copy_idx);
+
+       ret = __recover_one(rw, old_entry, old_nr, cur_entry, cur_nr, cur_idx,
+                           copy_idx, epoch, epoch - 1, oid, buf, 
SD_DATA_OBJ_SIZE);
+       if (ret == 0)
+               return;
+
+       for (i = 0; i < sys->nr_sobjs; i++) {
+               if (i == copy_idx)
+                       continue;
+               ret = __recover_one(rw, old_entry, old_nr,
+                                   cur_entry, cur_nr, cur_idx, i,
+                                   epoch, epoch - 1, oid, buf, 
SD_DATA_OBJ_SIZE);
+               if (ret == 0)
+                       return;
+       }
+fail:
+       eprintf("failed to recover object %lx\n", oid);
 }
 
 static void __start_recovery(struct work *work, int idx);
 static void __start_recovery_done(struct work *work, int idx);
 
+static void recover_one_timer(void *data)
+{
+       struct recovery_work *rw = (struct recovery_work *)data;
+       queue_work(dobj_queue, &rw->work);
+}
+
 static void recover_one_done(struct work *work, int idx)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
 
-       if (rw->done < rw->count) {
-               rw->done++;
-               queue_work(dobj_queue, &rw->work);
+       if (rw->retry) {
+               rw->retry = 0;
+
+               rw->timer.callback = recover_one_timer;
+               rw->timer.data = rw;
+               add_timer(&rw->timer, 2);
                return;
        }
 
-       if (rw->iteration) {
-               if (++rw->iteration <= sys->nr_sobjs) {
-                       free(rw->buf);
+       rw->done++;
 
-                       rw->done = 0;
-
-                       rw->work.fn = __start_recovery;
-                       rw->work.done = __start_recovery_done;
-
-                       queue_work(dobj_queue, &rw->work);
-
-                       return;
-               }
+       if (rw->done < rw->count && rw->rw_siblings.next == 
&recovery_work_list) {
+               queue_work(dobj_queue, &rw->work);
+               return;
        }
 
+       dprintf("recovery done, %d\n", rw->epoch);
        recovering--;
 
        list_del(&rw->rw_siblings);
 
-       if (rw->buf)
-               free(rw->buf);
+       free(rw->buf);
        free(rw);
 
        if (!list_empty(&recovery_work_list)) {
@@ -843,16 +1074,16 @@ static void recover_one_done(struct work *work, int idx)
        }
 }
 
-static int fill_obj_list(struct recovery_work *rw,
-                        struct sheepdog_node_list_entry *e,
-                        uint64_t start_hash, uint64_t end_hash)
+static int __fill_obj_list(struct recovery_work *rw,
+                          struct sheepdog_node_list_entry *e,
+                          uint64_t start_hash, uint64_t end_hash, uint64_t 
*done_hash)
 {
        int fd, ret;
        uint32_t epoch = rw->epoch;
        unsigned wlen, rlen;
        char name[128];
-       struct sd_obj_req hdr;
-       struct sd_obj_rsp *rsp;
+       struct sd_list_req hdr;
+       struct sd_list_rsp *rsp;
 
        addr_to_str(name, sizeof(name), e->addr, 0);
 
@@ -870,34 +1101,66 @@ static int fill_obj_list(struct recovery_work *rw,
        memset(&hdr, 0, sizeof(hdr));
        hdr.opcode = SD_OP_GET_OBJ_LIST;
        hdr.epoch = sys->epoch;
-       hdr.oid = start_hash;
-       hdr.cow_oid = end_hash;
-       hdr.obj_ver = epoch - 1;
+       hdr.start = start_hash;
+       hdr.end = end_hash;
+       hdr.tgt_epoch = epoch - 1;
        hdr.flags = 0;
        hdr.data_length = rlen;
 
-       dprintf("%016lx, %016lx\n", hdr.oid, hdr.cow_oid);
+       dprintf("%016lx, %016lx\n", hdr.start, hdr.end);
 
-       rw->buf = malloc(rlen);
        memcpy(&rw->e, e, sizeof(rw->e));
 
-       ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf, &wlen, &rlen);
+       ret = exec_req(fd, (struct sd_req *)&hdr, rw->buf + rw->count * 
sizeof(uint64_t), &wlen, &rlen);
 
        close(fd);
 
-       rsp = (struct sd_obj_rsp *)&hdr;
+       rsp = (struct sd_list_rsp *)&hdr;
 
        if (rsp->result != SD_RES_SUCCESS) {
-               eprintf("%d\n", rsp->result);
-               return -1;
+               rw->retry = 1;
+               *done_hash = end_hash;
+               eprintf("try again, %d\n", rsp->result);
+               return 0;
        }
 
        dprintf("%d\n", rsp->data_length);
 
        if (rsp->data_length)
-               rw->count = rsp->data_length / sizeof(uint64_t);
-       else
-               rw->count = 0;
+               rw->count += rsp->data_length / sizeof(uint64_t);
+
+       *done_hash = rsp->next;
+
+       return 0;
+}
+
+static int fill_obj_list(struct recovery_work *rw,
+                        struct sheepdog_node_list_entry *old_entry, int old_nr,
+                        struct sheepdog_node_list_entry *cur_entry, int cur_nr,
+                        uint64_t start_hval, uint64_t end_hval)
+{
+       int i, idx, old_idx, cur_idx;
+       uint64_t hval, done_hval = end_hval;
+
+       hval = start_hval;
+again:
+       old_idx = hval_to_sheep(old_entry, old_nr, hval + 1, 0);
+       cur_idx = hval_to_sheep(cur_entry, cur_nr, hval + 1, 0);
+
+       for (i = 0; i < sys->nr_sobjs; i++) {
+               idx = find_tgt_node(old_entry, old_nr, old_idx, cur_entry, 
cur_nr, cur_idx, i);
+               dprintf("%d, %d\n", idx, i);
+               if (__fill_obj_list(rw, old_entry + idx, hval, end_hval, 
&done_hval) == 0)
+                       break;
+       }
+       if (i == sys->nr_sobjs)
+               return -1;
+
+       if (done_hval != end_hval) {
+               dprintf("%lx, %lx\n", done_hval, end_hval);
+               hval = done_hval;
+               goto again;
+       }
 
        return 0;
 }
@@ -909,153 +1172,122 @@ static void __start_recovery(struct work *work, int 
idx)
        struct sheepdog_node_list_entry old_entry[SD_MAX_NODES],
                cur_entry[SD_MAX_NODES];
        int old_nr, cur_nr;
-       int my_idx = -1, ch_idx = -1;
-       int i, j, n;
+       int my_idx = -1;
+       int i, fd;
        uint64_t start_hash, end_hash;
+       char path[PATH_MAX];
 
        dprintf("%u\n", epoch);
 
        cur_nr = epoch_log_read(epoch, (char *)cur_entry, sizeof(cur_entry));
-       if (cur_nr <= 0)
+       if (cur_nr <= 0) {
+               eprintf("failed to read epoch log, %d\n", epoch);
                goto fail;
+       }
        cur_nr /= sizeof(struct sheepdog_node_list_entry);
 
        old_nr = epoch_log_read(epoch - 1, (char *)old_entry, 
sizeof(old_entry));
-       if (old_nr <= 0)
+       if (old_nr <= 0) {
+               eprintf("failed to read epoch log, %d\n", epoch - 1);
                goto fail;
+       }
        old_nr /= sizeof(struct sheepdog_node_list_entry);
 
-       if (!sys->nr_sobjs || cur_nr < sys->nr_sobjs || old_nr < sys->nr_sobjs)
+       if (!sys->nr_sobjs)
                goto fail;
 
-       if (cur_nr < old_nr) {
-               for (i = 0; i < old_nr; i++) {
-                       if (is_myself(&old_entry[i])) {
-                               my_idx = i;
-                               break;
-                       }
-               }
-
-               dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
-               for (i = 0; i < old_nr; i++) {
-                       for (j = 0; j < cur_nr; j++) {
-                               if (old_entry[i].id == cur_entry[j].id)
-                                       break;
-                       }
-
-                       if (j == cur_nr)
-                               ch_idx = i;
-               }
-
-               dprintf("%u %u %u\n", my_idx, ch_idx,
-                       node_distance(my_idx, ch_idx, old_nr));
-
-               if (node_distance(my_idx, ch_idx, old_nr) > sys->nr_sobjs)
-                       return;
-
-               n = node_from_distance(my_idx, sys->nr_sobjs, old_nr);
-
-               dprintf("%d %d\n", n, sys->nr_sobjs);
-
-               start_hash = old_entry[(n - 1 + old_nr) % old_nr].id;
-               end_hash = old_entry[n].id;
-
-               /* FIXME */
-               if (node_distance(my_idx, ch_idx, old_nr) == sys->nr_sobjs) {
-                       n++;
-                       n %= old_nr;
-               }
-
-               fill_obj_list(rw, old_entry + n, start_hash, end_hash);
-       } else {
-               for (i = 0; i < cur_nr; i++) {
-                       if (is_myself(&cur_entry[i])) {
-                               my_idx = i;
-                               break;
-                       }
+       for (i = 0; i < cur_nr; i++) {
+               if (cur_entry[i].id == sys->this_node.id) {
+                       my_idx = i;
+                       break;
                }
+       }
+       start_hash = cur_entry[(my_idx - sys->nr_sobjs + cur_nr) % cur_nr].id;
+       end_hash = cur_entry[my_idx].id;
 
-               dprintf("%u %u %u, %d\n", cur_nr, old_nr, epoch, my_idx);
-
-               if (my_idx == -1)
-                       return;
-
-               n = node_from_distance(my_idx, rw->iteration, cur_nr);
-               start_hash = cur_entry[n].id;
-               end_hash = cur_entry[(n + 1 + cur_nr) % cur_nr].id;
-
-               if (rw->iteration == 1)
-                       n = (my_idx + 1 + cur_nr) % cur_nr;
-               else
-                       n = (n + 1 + cur_nr) % cur_nr;
-
-               dprintf("%u %u %u\n", my_idx, n, rw->iteration);
+       dprintf("fill obj list (from 0x%lx to 0x%lx)\n", start_hash, end_hash);
+       if (fill_obj_list(rw, old_entry, old_nr, cur_entry, cur_nr,
+                         start_hash, end_hash) != 0) {
+               eprintf("fatal recovery error\n");
+               goto fail;
+       }
 
-               start_hash = cur_entry[n].id;
-               end_hash = cur_entry[(n - 1 + cur_nr) % cur_nr].id;
+       snprintf(path, sizeof(path), "%s%08u/list", obj_path, epoch);
+       dprintf("write object list file to %s\n", path);
 
-               fill_obj_list(rw, cur_entry + n, start_hash, end_hash);
+       fd = open(path, O_RDWR | O_CREAT, def_fmode);
+       if (fd < 0) {
+               eprintf("failed to open %s, %s\n", path, strerror(errno));
+               goto fail;
        }
+       write(fd, rw->buf, sizeof(uint64_t) * rw->count);
+       close(fd);
 
        return;
-
 fail:
        rw->count = 0;
-       rw->iteration = 0;
        return;
 }
 
+static void start_recovery_timer(void *data)
+{
+       struct recovery_work *rw = (struct recovery_work *)data;
+       queue_work(dobj_queue, &rw->work);
+}
+
 static void __start_recovery_done(struct work *work, int idx)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
 
-       if (!rw->count) {
-               if (rw->iteration) {
-                       if (++rw->iteration <= sys->nr_sobjs) {
-                               free(rw->buf);
+       if (rw->retry) {
+               rw->retry = 0;
 
-                               rw->work.fn = __start_recovery;
-                               rw->work.done = __start_recovery_done;
+               rw->timer.callback = start_recovery_timer;
+               rw->timer.data = rw;
+               add_timer(&rw->timer, 1);
+               return;
+       }
 
-                               queue_work(dobj_queue, &rw->work);
+       if (rw->count && rw->rw_siblings.next == &recovery_work_list) {
+               rw->work.fn = recover_one;
+               rw->work.done = recover_one_done;
 
-                               return;
-                       }
-               }
+               /* TODO: we should avoid races with qemu I/Os */
+               /* rw->work.attr = WORK_ORDERED; */
 
-               free(rw->buf);
-               free(rw);
+               queue_work(dobj_queue, &rw->work);
                return;
        }
 
-       rw->work.fn = recover_one;
-       rw->work.done = recover_one_done;
+       dprintf("recovery done, %d\n", rw->epoch);
+       recovering--;
 
-       /* TODO: we should avoid races with qemu I/Os */
-/*     rw->work.attr = WORK_ORDERED; */
+       list_del(&rw->rw_siblings);
 
-       queue_work(dobj_queue, &rw->work);
+       free(rw->buf);
+       free(rw);
+
+       if (!list_empty(&recovery_work_list)) {
+               rw = list_first_entry(&recovery_work_list,
+                                     struct recovery_work, rw_siblings);
+
+               recovering++;
+               queue_work(dobj_queue, &rw->work);
+       }
 }
 
-int start_recovery(uint32_t epoch, int add)
+int start_recovery(uint32_t epoch)
 {
        struct recovery_work *rw;
 
-       /* disable for now */
-       if (add)
-               return 0;
-
        rw = zalloc(sizeof(struct recovery_work));
        if (!rw)
                return -1;
 
+       rw->buf = malloc(1 << 20); /* FIXME */
        rw->epoch = epoch;
        rw->count = 0;
 
-       if (add)
-               rw->iteration = 1;
-
        rw->work.fn = __start_recovery;
        rw->work.done = __start_recovery_done;
 
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 24f06ae..a41eb50 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -52,6 +52,7 @@
 #define SD_FLAG_CMD_WRITE    0x01
 #define SD_FLAG_CMD_COW      0x02
 #define SD_FLAG_CMD_FORWARD  0x04
+#define SD_FLAG_CMD_RECOVERY 0x08
 
 #define SD_STATUS_OK            0x00
 #define SD_STATUS_STARTUP       0x01
@@ -169,7 +170,7 @@ struct sd_obj_req {
        uint64_t        oid;
        uint64_t        cow_oid;
        uint32_t        copies;
-       uint32_t        obj_ver;
+       uint32_t        tgt_epoch;
        uint64_t        offset;
 };
 
@@ -186,6 +187,31 @@ struct sd_obj_rsp {
        uint32_t        pad[5];
 };
 
+struct sd_list_req {
+       uint8_t         proto_ver;
+       uint8_t         opcode;
+       uint16_t        flags;
+       uint32_t        epoch;
+       uint32_t        id;
+       uint32_t        data_length;
+       uint64_t        start;
+       uint64_t        end;
+       uint32_t        tgt_epoch;
+       uint32_t        pad[3];
+};
+
+struct sd_list_rsp {
+       uint8_t         proto_ver;
+       uint8_t         opcode;
+       uint16_t        flags;
+       uint32_t        epoch;
+       uint32_t        id;
+       uint32_t        data_length;
+       uint32_t        result;
+       uint64_t        next;
+       uint32_t        pad[5];
+};
+
 struct sd_vdi_req {
        uint8_t         proto_ver;
        uint8_t         opcode;
@@ -281,17 +307,17 @@ static inline uint64_t fnv_64a_buf(void *buf, size_t len, 
uint64_t hval)
        return hval;
 }
 
-static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
-                              int nr_entries, uint64_t oid, int idx)
+static inline int hval_to_sheep(struct sheepdog_node_list_entry *entries,
+                               int nr_entries, uint64_t id, int idx)
 {
-       uint64_t id;
        int i;
        struct sheepdog_node_list_entry *e = entries, *n;
 
-       id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+       printf("%lx\n", id);
 
        for (i = 0; i < nr_entries - 1; i++, e++) {
                n = e + 1;
+               printf("%d, %lx, %lx, %lx\n", i, e->id, n->id, id);
                if (id > e->id && id <= n->id)
                        break;
        }
@@ -299,6 +325,14 @@ static inline int obj_to_sheep(struct 
sheepdog_node_list_entry *entries,
        return (i + 1 + idx) % nr_entries;
 }
 
+static inline int obj_to_sheep(struct sheepdog_node_list_entry *entries,
+                              int nr_entries, uint64_t oid, int idx)
+{
+       uint64_t id = fnv_64a_buf(&oid, sizeof(oid), FNV1A_64_INIT);
+
+       return hval_to_sheep(entries, nr_entries, id, idx);
+}
+
 static inline void print_node_list_entry(struct sheepdog_node_list_entry *e,
                                         char *str, size_t size)
 {
-- 
1.5.6.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to