This declares current_vnode_info, sys->pending_block_list,
sys->pending_notify_list, and recovering_work as thread_unsafe
variables.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 sheep/group.c      | 92 +++++++++++++++++++++++++++++++++---------------------
 sheep/recovery.c   | 27 +++++++++-------
 sheep/sheep_priv.h |  3 --
 3 files changed, 72 insertions(+), 50 deletions(-)

diff --git a/sheep/group.c b/sheep/group.c
index ee23588..17c4123 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -45,7 +45,9 @@ static pthread_mutex_t wait_vdis_lock = 
PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t wait_vdis_cond = PTHREAD_COND_INITIALIZER;
 static bool is_vdi_list_ready = true;
 
-static struct vnode_info *current_vnode_info;
+static thread_unsafe(struct vnode_info *) current_vnode_info;
+static thread_unsafe(struct list_head *) pending_block_list;
+static thread_unsafe(struct list_head *) pending_notify_list;
 
 static size_t get_join_message_size(struct join_message *jm)
 {
@@ -87,26 +89,27 @@ static int get_zones_nr_from(const struct sd_node *nodes, 
int nr_nodes)
 bool have_enough_zones(void)
 {
        int max_copies;
+       struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
        if (sys->flags & SD_FLAG_NOHALT)
                return true;
 
-       if (!current_vnode_info)
+       if (!cur_vinfo)
                return false;
 
        max_copies = get_max_copy_number();
 
        sd_dprintf("flags %d, nr_zones %d, min copies %d",
-                  sys->flags, current_vnode_info->nr_zones, max_copies);
+                  sys->flags, cur_vinfo->nr_zones, max_copies);
 
-       if (!current_vnode_info->nr_zones)
+       if (!cur_vinfo->nr_zones)
                return false;
 
        if (sys->flags & SD_FLAG_QUORUM) {
-               if (current_vnode_info->nr_zones > (max_copies/2))
+               if (cur_vinfo->nr_zones > (max_copies/2))
                        return true;
        } else {
-               if (current_vnode_info->nr_zones >= max_copies)
+               if (cur_vinfo->nr_zones >= max_copies)
                        return true;
        }
        return false;
@@ -143,9 +146,11 @@ struct vnode_info *grab_vnode_info(struct vnode_info 
*vnode_info)
  */
 struct vnode_info *get_vnode_info(void)
 {
-       assert(current_vnode_info);
+       struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
-       return grab_vnode_info(current_vnode_info);
+       assert(cur_vinfo);
+
+       return grab_vnode_info(cur_vinfo);
 }
 
 /* Release a reference to the current vnode information. */
@@ -200,15 +205,15 @@ int local_get_node_list(const struct sd_req *req, struct 
sd_rsp *rsp,
 {
        struct sd_node_rsp *node_rsp = (struct sd_node_rsp *)rsp;
        int nr_nodes;
+       struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
 
-       if (current_vnode_info) {
-               nr_nodes = current_vnode_info->nr_nodes;
-               memcpy(data, current_vnode_info->nodes,
+       if (cur_vinfo) {
+               nr_nodes = cur_vinfo->nr_nodes;
+               memcpy(data, cur_vinfo->nodes,
                        sizeof(struct sd_node) * nr_nodes);
                node_rsp->data_length = nr_nodes * sizeof(struct sd_node);
                node_rsp->nr_nodes = nr_nodes;
-               node_rsp->local_idx = get_node_idx(current_vnode_info,
-                                                  &sys->this_node);
+               node_rsp->local_idx = get_node_idx(cur_vinfo, &sys->this_node);
        } else {
                node_rsp->nr_nodes = 0;
                node_rsp->local_idx = 0;
@@ -282,7 +287,7 @@ bool sd_block_handler(const struct sd_node *sender)
 
        cluster_op_running = true;
 
-       req = list_first_entry(&sys->pending_block_list,
+       req = list_first_entry(thread_unsafe_get(pending_block_list),
                                struct request, pending_list);
        req->work.fn = do_process_work;
        req->work.done = cluster_op_done;
@@ -303,14 +308,16 @@ void queue_cluster_request(struct request *req)
        sd_dprintf("%s (%p)", op_name(req->op), req);
 
        if (has_process_work(req->op)) {
-               list_add_tail(&req->pending_list, &sys->pending_block_list);
+               list_add_tail(&req->pending_list,
+                             thread_unsafe_get(pending_block_list));
                sys->cdrv->block();
        } else {
                struct vdi_op_message *msg;
                size_t size;
 
                msg = prepare_cluster_msg(req, &size);
-               list_add_tail(&req->pending_list, &sys->pending_notify_list);
+               list_add_tail(&req->pending_list,
+                             thread_unsafe_get(pending_notify_list));
 
                msg->rsp.result = SD_RES_SUCCESS;
                sys->cdrv->notify(msg, size);
@@ -538,6 +545,7 @@ static int cluster_wait_for_join_check(const struct sd_node 
*joined,
        int nr, nr_local_entries, nr_failed_entries, nr_delayed_nodes;
        uint32_t local_epoch = get_latest_epoch();
        int ret;
+       struct vnode_info *cur_vinfo;
 
        if (jm->nr_nodes == 0)
                return CJ_RES_JOIN_LATER;
@@ -580,10 +588,11 @@ static int cluster_wait_for_join_check(const struct 
sd_node *joined,
                return CJ_RES_FAIL;
        }
 
-       if (!current_vnode_info)
+       cur_vinfo = thread_unsafe_get(current_vnode_info);
+       if (!cur_vinfo)
                nr = 1;
        else
-               nr = current_vnode_info->nr_nodes + 1;
+               nr = cur_vinfo->nr_nodes + 1;
 
        nr_delayed_nodes = get_nodes_nr_from(&sys->delayed_nodes);
 
@@ -704,10 +713,12 @@ static void get_vdis_done(struct work *work)
 
 int log_current_epoch(void)
 {
-       if (!current_vnode_info)
+       struct vnode_info *cur_vinfo = thread_unsafe_get(current_vnode_info);
+
+       if (!cur_vinfo)
                return update_epoch_log(sys->epoch, NULL, 0);
-       return update_epoch_log(sys->epoch, current_vnode_info->nodes,
-                               current_vnode_info->nr_nodes);
+       return update_epoch_log(sys->epoch, cur_vinfo->nodes,
+                               cur_vinfo->nr_nodes);
 }
 
 static struct vnode_info *alloc_old_vnode_info(const struct sd_node *joined,
@@ -852,8 +863,9 @@ static void update_cluster_info(const struct join_message 
*msg,
        if (!sys->join_finished)
                finish_join(msg, joined, nodes, nr_nodes);
 
-       old_vnode_info = current_vnode_info;
-       current_vnode_info = alloc_vnode_info(nodes, nr_nodes);
+       old_vnode_info = thread_unsafe_get(current_vnode_info);
+       thread_unsafe_set(current_vnode_info,
+                         alloc_vnode_info(nodes, nr_nodes));
 
        switch (msg->cluster_status) {
        case SD_STATUS_OK:
@@ -887,7 +899,8 @@ static void update_cluster_info(const struct join_message 
*msg,
                                                nodes, nr_nodes);
                        }
 
-                       start_recovery(current_vnode_info, old_vnode_info);
+                       start_recovery(thread_unsafe_get(current_vnode_info),
+                                      old_vnode_info);
                }
 
                if (have_enough_zones())
@@ -922,11 +935,13 @@ void sd_notify_handler(const struct sd_node *sender, void 
*data,
 
        if (node_is_local(sender)) {
                if (has_process_work(op))
-                       req = list_first_entry(&sys->pending_block_list,
-                                              struct request, pending_list);
+                       req = list_first_entry(
+                               thread_unsafe_get(pending_block_list),
+                               struct request, pending_list);
                else
-                       req = list_first_entry(&sys->pending_notify_list,
-                                              struct request, pending_list);
+                       req = list_first_entry(
+                               thread_unsafe_get(pending_notify_list),
+                               struct request, pending_list);
                list_del(&req->pending_list);
        }
 
@@ -1135,8 +1150,9 @@ void sd_join_handler(const struct sd_node *joined,
                        sys->join_finished = true;
                        sys->epoch = get_latest_epoch();
 
-                       put_vnode_info(current_vnode_info);
-                       current_vnode_info = alloc_vnode_info(&sys->this_node, 
1);
+                       put_vnode_info(thread_unsafe_get(current_vnode_info));
+                       thread_unsafe_set(current_vnode_info,
+                                         alloc_vnode_info(&sys->this_node, 1));
                }
 
                nr_local = get_nodes_nr_epoch(sys->epoch);
@@ -1178,14 +1194,16 @@ void sd_leave_handler(const struct sd_node *left, const 
struct sd_node *members,
                /* Mark leave node as gateway only node */
                sys->this_node.nr_vnodes = 0;
 
-       old_vnode_info = current_vnode_info;
-       current_vnode_info = alloc_vnode_info(members, nr_members);
+       old_vnode_info = thread_unsafe_get(current_vnode_info);
+       thread_unsafe_set(current_vnode_info,
+                         alloc_vnode_info(members, nr_members));
        switch (sys->status) {
        case SD_STATUS_HALT:
        case SD_STATUS_OK:
                uatomic_inc(&sys->epoch);
                log_current_epoch();
-               start_recovery(current_vnode_info, old_vnode_info);
+               start_recovery(thread_unsafe_get(current_vnode_info),
+                              old_vnode_info);
                if (!have_enough_zones())
                        sys->status = SD_STATUS_HALT;
                break;
@@ -1249,8 +1267,12 @@ int create_cluster(int port, int64_t zone, int nr_vnodes,
                sys->status = SD_STATUS_WAIT_FOR_FORMAT;
        }
 
-       INIT_LIST_HEAD(&sys->pending_block_list);
-       INIT_LIST_HEAD(&sys->pending_notify_list);
+       thread_unsafe_set(pending_block_list,
+                         xzalloc(sizeof(struct list_head)));
+       INIT_LIST_HEAD(thread_unsafe_get(pending_block_list));
+       thread_unsafe_set(pending_notify_list,
+                         xzalloc(sizeof(struct list_head)));
+       INIT_LIST_HEAD(thread_unsafe_get(pending_notify_list));
        INIT_LIST_HEAD(&sys->failed_nodes);
        INIT_LIST_HEAD(&sys->delayed_nodes);
 
diff --git a/sheep/recovery.c b/sheep/recovery.c
index 0be846c..57ea38a 100644
--- a/sheep/recovery.c
+++ b/sheep/recovery.c
@@ -46,8 +46,9 @@ struct recovery_work {
        struct vnode_info *cur_vinfo;
 };
 
-static struct recovery_work *next_rw;
-static struct recovery_work *recovering_work;
+struct recovery_work *next_rw;
+static thread_unsafe(struct recovery_work *) recovering_work;
+
 /* Dynamically grown list buffer default as 4M (2T storage) */
 #define DEFAULT_LIST_BUFFER_SIZE (UINT64_C(1) << 22)
 static size_t list_buffer_size = DEFAULT_LIST_BUFFER_SIZE;
@@ -215,12 +216,12 @@ static void recover_object_work(struct work *work)
 
 bool node_in_recovery(void)
 {
-       return !!recovering_work;
+       return thread_unsafe_get(recovering_work) != NULL;
 }
 
 static inline void prepare_schedule_oid(uint64_t oid)
 {
-       struct recovery_work *rw = recovering_work;
+       struct recovery_work *rw = thread_unsafe_get(recovering_work);
        int i;
 
        for (i = 0; i < rw->nr_prio_oids; i++)
@@ -253,7 +254,7 @@ static inline void prepare_schedule_oid(uint64_t oid)
 
 bool oid_in_recovery(uint64_t oid)
 {
-       struct recovery_work *rw = recovering_work;
+       struct recovery_work *rw = thread_unsafe_get(recovering_work);
        int i;
 
        if (!node_in_recovery())
@@ -310,7 +311,7 @@ static inline bool run_next_rw(struct recovery_work *rw)
                return false;
 
        free_recovery_work(rw);
-       recovering_work = nrw;
+       thread_unsafe_set(recovering_work, nrw);
        wakeup_all_requests();
        queue_work(sys->recovery_wqueue, &nrw->work);
        sd_dprintf("recovery work is superseded");
@@ -345,7 +346,7 @@ static void notify_recovery_completion_main(struct work 
*work)
 static inline void finish_recovery(struct recovery_work *rw)
 {
        uint32_t recovered_epoch = rw->epoch;
-       recovering_work = NULL;
+       thread_unsafe_set(recovering_work, NULL);
 
        if (sd_store->end_recover)
                sd_store->end_recover(sys->epoch - 1, rw->old_vinfo);
@@ -445,9 +446,11 @@ static void recover_next_object(struct recovery_work *rw)
 
 void resume_suspended_recovery(void)
 {
-       if (recovering_work && recovering_work->suspended) {
-               recovering_work->suspended = false;
-               recover_next_object(recovering_work);
+       struct recovery_work *rw = thread_unsafe_get(recovering_work);
+
+       if (rw && rw->suspended) {
+               rw->suspended = false;
+               recover_next_object(rw);
        }
 }
 
@@ -647,7 +650,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct 
vnode_info *old_vinfo)
        rw->work.fn = prepare_object_list;
        rw->work.done = finish_object_list;
 
-       if (recovering_work != NULL) {
+       if (thread_unsafe_get(recovering_work) != NULL) {
                /* skip the previous epoch recovery */
                struct recovery_work *nrw = uatomic_xchg_ptr(&next_rw, rw);
                if (nrw)
@@ -660,7 +663,7 @@ int start_recovery(struct vnode_info *cur_vinfo, struct 
vnode_info *old_vinfo)
                 */
                resume_suspended_recovery();
        } else {
-               recovering_work = rw;
+               thread_unsafe_set(recovering_work, rw);
                queue_work(sys->recovery_wqueue, &rw->work);
        }
 out:
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 0f66613..155f584 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -88,9 +88,6 @@ struct cluster_info {
         */
        struct list_head delayed_nodes;
 
-       struct list_head pending_block_list;
-       struct list_head pending_notify_list;
-
        DECLARE_BITMAP(vdi_inuse, SD_NR_VDIS);
 
        uint8_t nr_copies;
-- 
1.8.1.3.566.gaa39828

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to