From: Liu Yuan <[email protected]>

It is not thread safe to manipulate rw->oids[] both in main and worker threads.
Add a rw->prio_oids[] and let rw->oids[] handling be safe in 
recover_object_main(),
which means no recover_object_work is being executed meanwhile.

This also fix a nasty buffer overflow in rw->oids[], which did an insane memmove
and clean up code a bit.

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/recovery.c |  111 ++++++++++++++++++++++++++++++++++--------------------
 1 file changed, 71 insertions(+), 40 deletions(-)

diff --git a/sheep/recovery.c b/sheep/recovery.c
index 591c5d1..b4e2134 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -31,9 +31,10 @@ struct recovery_work {
        int stop;
        struct work work;
 
-       int nr_blocking;
        int count;
        uint64_t *oids;
+       uint64_t *prio_oids;
+       int nr_prio_oids;
 
        struct vnode_info *old_vnodes;
        struct vnode_info *cur_vnodes;
@@ -271,45 +272,22 @@ int is_recovery_init(void)
        return rw->state == RW_INIT;
 }
 
-static inline bool schedule_oid(uint64_t oid)
+static inline void prepare_schedule_oid(uint64_t oid)
 {
-       uint64_t hval, min_hval;
-       int i;
        struct recovery_work *rw = recovering_work;
+       int i;
 
-       /* Check if the oid is already scheduled in front */
-       for (i = 0; i < rw->nr_blocking; i++)
-               if (rw->oids[rw->done + i] == oid)
-                       return true;
+       for (i = 0; i < rw->nr_prio_oids; i++)
+               if (rw->prio_oids[i] == oid )
+                       return;
 
-       min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking],
-                              sizeof(uint64_t), FNV1A_64_INIT);
-       hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
-
-       if (min_hval <= hval) {
-               uint64_t *p;
-               p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking,
-                           rw->count - rw->done - rw->nr_blocking, sizeof(oid),
-                           obj_cmp);
-               if (p) {
-                       dprintf("recover the object %" PRIx64 " first\n", oid);
-                       /* The first oid may be processed now */
-                       if (rw->nr_blocking == 0)
-                               rw->nr_blocking = 1;
-                       /* This oid should be recovered first */
-                       if (p > rw->oids + rw->done + rw->nr_blocking) {
-                               memmove(rw->oids + rw->done + rw->nr_blocking + 
1,
-                                       rw->oids + rw->done + rw->nr_blocking,
-                                       sizeof(uint64_t) * (p - (rw->oids + 
rw->done + rw->nr_blocking)));
-                               rw->oids[rw->done + rw->nr_blocking] = oid;
-                               rw->nr_blocking++;
-                       }
-                       return true;
-               }
-       }
+       /* The oid is currently being recovered */
+       if (rw->oids[rw->done] == oid)
+               return;
 
-       dprintf("the object %" PRIx64 " is not found\n", oid);
-       return false;
+       rw->prio_oids = xrealloc(rw->prio_oids, ++rw->nr_prio_oids);
+       rw->prio_oids[rw->nr_prio_oids - 1] = oid;
+       dprintf("%"PRIx64" nr_prio_oids %d\n", oid, rw->nr_prio_oids);
 }
 
 bool oid_in_recovery(uint64_t oid)
@@ -331,7 +309,14 @@ bool oid_in_recovery(uint64_t oid)
        if (rw->state == RW_INIT)
                return true;
 
-       return schedule_oid(oid);
+       if (!bsearch(&oid, rw->oids + rw->done, rw->count - rw->done,
+                    sizeof(oid), obj_cmp)) {
+               eprintf("%"PRIx64" is not in the recovery list\n", oid);
+               return false;
+       }
+
+       prepare_schedule_oid(oid);
+       return true;
 }
 
 static void free_recovery_work(struct recovery_work *rw)
@@ -368,6 +353,49 @@ static inline void finish_recovery(struct recovery_work 
*rw)
                sys->recovered_epoch);
 }
 
+static inline bool oid_in_prio_oids(struct recovery_work *rw, uint64_t oid)
+{
+       int i;
+
+       for (i = 0; i < rw->nr_prio_oids; i++)
+               if (rw->prio_oids[i] == oid)
+                       return true;
+       return false;
+}
+
+/*
+ * Schedule prio_oids to be recovered first in FIFO order
+ *
+ * rw->done is index of the original next object to be recovered and also the
+ * number of objects already recovered.
+ * we just move rw->prio_oids in between:
+ *   new_oids = [0..rw->done - 1] + [rw->prio_oids] + [rw->done]
+ */
+static inline void finish_schedule_oids(struct recovery_work *rw)
+{
+       int i, nr_recovered = rw->done, new_idx;
+       uint64_t *new_oids = xmalloc(1 << 20); /* FIXME */
+
+       memmove(new_oids, rw->oids, nr_recovered * sizeof(uint64_t));
+       memmove(new_oids + nr_recovered, rw->prio_oids,
+               rw->nr_prio_oids * sizeof(uint64_t));
+       new_idx = nr_recovered + rw->nr_prio_oids;
+
+       for (i = rw->done; i < rw->count; i++) {
+               if (oid_in_prio_oids(rw, rw->oids[i]))
+                       continue;
+               new_oids[new_idx++] = rw->oids[i];
+       }
+       dprintf("nr_recovered %d, nr_prio_oids %d, count %d, new %d\n",
+               nr_recovered, rw->nr_prio_oids, rw->count, new_idx);
+
+       free(rw->prio_oids);
+       free(rw->oids);
+       rw->oids = new_oids;
+       rw->prio_oids = NULL;
+       rw->nr_prio_oids = 0;
+}
+
 static void recover_object_main(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work,
@@ -388,11 +416,12 @@ static void recover_object_main(struct work *work)
                return;
        }
 
-       if (rw->nr_blocking > 0)
-               rw->nr_blocking--;
        resume_wait_obj_requests(rw->oids[rw->done++]);
 
        if (rw->done < rw->count) {
+               if (rw->nr_prio_oids)
+                       finish_schedule_oids(rw);
+
                /* Try recover next object */
                queue_work(sys->recovery_wqueue, &rw->work);
                return;
@@ -557,11 +586,13 @@ int start_recovery(struct vnode_info *cur_vnodes, struct 
vnode_info *old_vnodes)
        struct recovery_work *rw;
 
        rw = zalloc(sizeof(struct recovery_work));
-       if (!rw)
+       if (!rw) {
+               eprintf("%m\n");
                return -1;
+       }
 
        rw->state = RW_INIT;
-       rw->oids = malloc(1 << 20); /* FIXME */
+       rw->oids = xmalloc(1 << 20); /* FIXME */
        rw->epoch = sys->epoch;
        rw->count = 0;
 
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to