If clients access unrecovered objects, Sheepdog should recover them
first.  This fixes a bug which occurs when the number of such objects
are more than one.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 sheep/store.c |   47 +++++++++++++++++++++++++++++------------------
 1 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/sheep/store.c b/sheep/store.c
index 9f04ccc..c229679 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -1054,13 +1054,13 @@ struct recovery_work {
        struct work work;
        struct list_head rw_siblings;
 
+       int nr_blocking;
        int count;
        uint64_t *oids;
 };
 
 static LIST_HEAD(recovery_work_list);
 static struct recovery_work *recovering_work;
-static uint64_t blocking_oid;
 
 /*
  * find_tgt_node - find the node from which we should recover objects
@@ -1334,9 +1334,6 @@ static void recover_one(struct work *work, int idx)
 
        eprintf("%"PRIu32" %"PRIu32", %16"PRIx64"\n", rw->done, rw->count, oid);
 
-       if (blocking_oid)
-               oid = blocking_oid; /* recover the blocked object first */
-
        fd = ob_open(epoch, oid, 0, &ret);
        if (fd != -1) {
                /* the object is already recovered */
@@ -1417,7 +1414,6 @@ out:
 }
 
 static struct recovery_work *suspended_recovery_work;
-static uint64_t recovering_oid;
 
 static void __start_recovery(struct work *work, int idx);
 
@@ -1431,7 +1427,6 @@ static void recover_timer(void *data)
                return;
        }
 
-       recovering_oid = oid;
        queue_work(&rw->work);
 }
 
@@ -1450,16 +1445,15 @@ void resume_recovery_work(void)
                return;
 
        suspended_recovery_work = NULL;
-       recovering_oid = oid;
        queue_work(&rw->work);
 }
 
 int is_recoverying_oid(uint64_t oid)
 {
        uint64_t hval = fnv_64a_buf(&oid, sizeof(uint64_t), FNV1A_64_INIT);
-       uint64_t recovering_hval = fnv_64a_buf(&recovering_oid, 
sizeof(uint64_t), FNV1A_64_INIT);
+       uint64_t min_hval;
        struct recovery_work *rw = recovering_work;
-       int ret, fd;
+       int ret, fd, i;
 
        if (oid == 0)
                return 0;
@@ -1467,6 +1461,8 @@ int is_recoverying_oid(uint64_t oid)
        if (!rw)
                return 0; /* there is no thread working for object recovery */
 
+       min_hval = fnv_64a_buf(&rw->oids[rw->done + rw->nr_blocking], 
sizeof(uint64_t), FNV1A_64_INIT);
+
        if (before(rw->epoch, sys->epoch))
                return 1;
 
@@ -1480,11 +1476,27 @@ int is_recoverying_oid(uint64_t oid)
                return 0;
        }
 
-       if (recovering_hval <= hval) {
-               if (bsearch(&oid, rw->oids + rw->done,
-                           rw->count - rw->done, sizeof(oid), obj_cmp)) {
+       /* the first 'rw->nr_blocking' objects were already scheduled to be 
done earlier */
+       for (i = 0; i < rw->nr_blocking; i++)
+               if (rw->oids[rw->done + i] == oid)
+                       return 1;
+
+       if (min_hval <= hval) {
+               uint64_t *p;
+               p = bsearch(&oid, rw->oids + rw->done + rw->nr_blocking,
+                           rw->count - rw->done - rw->nr_blocking, 
sizeof(oid), obj_cmp);
+               if (p) {
                        dprintf("recover the object %" PRIx64 " first\n", oid);
-                       blocking_oid = oid;
+                       if (rw->nr_blocking == 0)
+                               rw->nr_blocking = 1; /* the first oid may be 
processed now */
+                       if (p > rw->oids + rw->done + rw->nr_blocking) {
+                               /* this object should be recovered earlier */
+                               memmove(rw->oids + rw->done + rw->nr_blocking + 
1,
+                                       rw->oids + rw->done + rw->nr_blocking,
+                                       sizeof(uint64_t) * (p - (rw->oids + 
rw->done + rw->nr_blocking)));
+                               rw->oids[rw->done + rw->nr_blocking] = oid;
+                               rw->nr_blocking++;
+                       }
                        return 1;
                }
        }
@@ -1500,8 +1512,11 @@ static void recover_done(struct work *work, int idx)
 
        if (rw->state == RW_INIT)
                rw->state = RW_RUN;
-       else if (!rw->retry)
+       else if (!rw->retry) {
                rw->done++;
+               if (rw->nr_blocking > 0)
+                       rw->nr_blocking--;
+       }
 
        oid = rw->oids[rw->done];
 
@@ -1514,8 +1529,6 @@ static void recover_done(struct work *work, int idx)
                return;
        }
 
-       blocking_oid = 0;
-
        if (rw->done < rw->count && list_empty(&recovery_work_list)) {
                rw->work.fn = recover_one;
 
@@ -1523,14 +1536,12 @@ static void recover_done(struct work *work, int idx)
                        suspended_recovery_work = rw;
                        return;
                }
-               recovering_oid = oid;
                resume_pending_requests();
                queue_work(&rw->work);
                return;
        }
 
        dprintf("recovery done, %"PRIu32"\n", rw->epoch);
-       recovering_oid = 0;
        recovering_work = NULL;
 
        sys->recovered_epoch = rw->epoch;
-- 
1.5.6.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to