This also fixes a problem that get_sheep_fd() is not thread-safe.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 lib/coroutine.c           |   43 ++++++-------------------------------------
 sheep/cluster/accord.c    |    8 ++++----
 sheep/cluster/corosync.c  |    4 ++--
 sheep/cluster/local.c     |    4 ++--
 sheep/cluster/zookeeper.c |    4 ++--
 sheep/group.c             |    8 ++++----
 sheep/sdnet.c             |   40 +++++++++++++++++-----------------------
 sheep/sheep_priv.h        |    7 +++----
 sheep/store.c             |   36 +++++++++++++++++-------------------
 sheep/vdi.c               |    4 ++--
 sheep/work.c              |   14 +++-----------
 sheep/work.h              |    2 +-
 12 files changed, 63 insertions(+), 111 deletions(-)

diff --git a/lib/coroutine.c b/lib/coroutine.c
index 7ec9bba..5b2ed79 100644
--- a/lib/coroutine.c
+++ b/lib/coroutine.c
@@ -29,7 +29,6 @@
 #include <stdlib.h>
 #include <setjmp.h>
 #include <stdint.h>
-#include <pthread.h>
 #include <ucontext.h>
 #include <errno.h>
 #include <assert.h>
@@ -72,7 +71,7 @@ struct co_ucontext {
 /**
  * Per-thread coroutine bookkeeping
  */
-struct co_thread_state{
+__thread struct co_thread_state{
        /** Currently executing coroutine */
        struct coroutine *current;
 
@@ -82,9 +81,7 @@ struct co_thread_state{
 
        /** The default coroutine */
        struct co_ucontext leader;
-};
-
-static pthread_key_t thread_state_key;
+} co_ts;
 
 static enum co_action coroutine_switch(struct coroutine *from,
                                       struct coroutine *to,
@@ -102,43 +99,15 @@ union cc_arg {
 
 static struct co_thread_state *coroutine_get_thread_state(void)
 {
-       struct co_thread_state *s = pthread_getspecific(thread_state_key);
+       struct co_thread_state *s = &co_ts;
 
-       if (!s) {
-               s = zalloc(sizeof(*s));
-               if (!s)
-                       abort();
+       if (!s->current) {
                s->current = &s->leader.base;
                INIT_LIST_HEAD(&s->pool);
-               pthread_setspecific(thread_state_key, s);
        }
        return s;
 }
 
-static void coroutine_thread_cleanup(void *opaque)
-{
-       struct co_thread_state *s = opaque;
-       struct coroutine *co;
-       struct coroutine *tmp;
-
-       list_for_each_entry_safe(co, tmp, &s->pool, pool_next) {
-               free(container_of(co, struct co_ucontext, base)->stack);
-               free(co);
-       }
-       free(s);
-}
-
-static void __attribute__((constructor)) coroutine_init(void)
-{
-       int ret;
-
-       ret = pthread_key_create(&thread_state_key, coroutine_thread_cleanup);
-       if (ret != 0) {
-               fprintf(stderr, "unable to create leader key: %m\n");
-               abort();
-       }
-}
-
 static void coroutine_trampoline(int i0, int i1)
 {
        union cc_arg arg;
@@ -302,9 +271,9 @@ struct coroutine *coroutine_self(void)
 
 int in_coroutine(void)
 {
-       struct co_thread_state *s = pthread_getspecific(thread_state_key);
+       struct co_thread_state *s = &co_ts;
 
-       return s && s->current->caller;
+       return s->current && s->current->caller;
 }
 
 
diff --git a/sheep/cluster/accord.c b/sheep/cluster/accord.c
index a685f9e..cc762e3 100644
--- a/sheep/cluster/accord.c
+++ b/sheep/cluster/accord.c
@@ -391,7 +391,7 @@ struct acrd_leave_info {
        struct work work;
 };
 
-static void __acrd_leave(struct work *work, int idx)
+static void __acrd_leave(struct work *work)
 {
        struct acrd_leave_info *info = container_of(work, typeof(*info), work);
        struct acrd_handle *ah = info->ah;
@@ -425,7 +425,7 @@ static void __acrd_leave(struct work *work, int idx)
        pthread_mutex_unlock(&queue_lock);
 }
 
-static void __acrd_leave_done(struct work *work, int idx)
+static void __acrd_leave_done(struct work *work)
 {
        struct acrd_leave_info *info = container_of(work, typeof(*info), work);
 
@@ -536,7 +536,7 @@ static int accord_notify(void *msg, size_t msg_len, void 
(*block_cb)(void *arg))
        return add_event(ahandle, EVENT_NOTIFY, &this_node, msg, msg_len, 
block_cb);
 }
 
-static void acrd_block(struct work *work, int idx)
+static void acrd_block(struct work *work)
 {
        struct acrd_event ev;
 
@@ -552,7 +552,7 @@ static void acrd_block(struct work *work, int idx)
        pthread_mutex_unlock(&queue_lock);
 }
 
-static void acrd_block_done(struct work *work, int idx)
+static void acrd_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index 4fe7704..a1d528a 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -209,14 +209,14 @@ retry:
        return 0;
 }
 
-static void corosync_block(struct work *work, int idx)
+static void corosync_block(struct work *work)
 {
        struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
 
        bm->cb(bm->msg);
 }
 
-static void corosync_block_done(struct work *work, int idx)
+static void corosync_block_done(struct work *work)
 {
        struct corosync_block_msg *bm = container_of(work, typeof(*bm), work);
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index 84d4bb0..729741e 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -364,7 +364,7 @@ static int local_notify(void *msg, size_t msg_len, void 
(*block_cb)(void *arg))
        return 0;
 }
 
-static void local_block(struct work *work, int idx)
+static void local_block(struct work *work)
 {
        struct local_event *ev;
 
@@ -381,7 +381,7 @@ static void local_block(struct work *work, int idx)
        shm_queue_unlock();
 }
 
-static void local_block_done(struct work *work, int idx)
+static void local_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 7717b6b..32c9fd2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -441,7 +441,7 @@ static int zk_notify(void *msg, size_t msg_len, void 
(*block_cb)(void *arg))
        return add_event(zhandle, EVENT_NOTIFY, &this_node, msg, msg_len, 
block_cb);
 }
 
-static void zk_block(struct work *work, int idx)
+static void zk_block(struct work *work)
 {
        struct zk_event ev;
 
@@ -453,7 +453,7 @@ static void zk_block(struct work *work, int idx)
        zk_queue_push_back(zhandle, &ev);
 }
 
-static void zk_block_done(struct work *work, int idx)
+static void zk_block_done(struct work *work)
 {
 }
 
diff --git a/sheep/group.c b/sheep/group.c
index 2c075c5..ae9958e 100644
--- a/sheep/group.c
+++ b/sheep/group.c
@@ -215,7 +215,7 @@ static void do_cluster_op(void *arg)
        msg->rsp.result = ret;
 }
 
-void do_cluster_request(struct work *work, int idx)
+void do_cluster_request(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
        struct sd_req *hdr = (struct sd_req *)&req->rq;
@@ -886,7 +886,7 @@ static void cpg_event_free(struct cpg_event *cevent)
 
 static struct work cpg_event_work;
 
-static void cpg_event_fn(struct work *work, int idx)
+static void cpg_event_fn(struct work *work)
 {
        struct cpg_event *cevent = sys->cur_cevent;
 
@@ -913,7 +913,7 @@ static void cpg_event_fn(struct work *work, int idx)
        }
 }
 
-static void cpg_event_done(struct work *work, int idx)
+static void cpg_event_done(struct work *work)
 {
        struct cpg_event *cevent;
 
@@ -1132,7 +1132,7 @@ do_retry:
        while (!list_empty(&failed_req_list)) {
                struct request *req = list_first_entry(&failed_req_list,
                                                       struct request, r_wlist);
-               req->work.done(&req->work, 0);
+               req->work.done(&req->work);
 
                retry = 1;
        }
diff --git a/sheep/sdnet.c b/sheep/sdnet.c
index 510fd4e..5c97374 100644
--- a/sheep/sdnet.c
+++ b/sheep/sdnet.c
@@ -76,7 +76,7 @@ static void setup_access_to_local_objects(struct request *req)
                req->local_oid = hdr->oid;
 }
 
-static void io_op_done(struct work *work, int idx)
+static void io_op_done(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
        struct cpg_event *cevent = &req->cev;
@@ -167,7 +167,7 @@ done:
                req->done(req);
 }
 
-static void local_op_done(struct work *work, int idx)
+static void local_op_done(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
 
@@ -179,19 +179,17 @@ static void local_op_done(struct work *work, int idx)
        req->done(req);
 }
 
-static void cluster_op_done(struct work *work, int idx)
+static void cluster_op_done(struct work *work)
 {
        /* request is forwarded to cpg group */
 }
 
-static void do_local_request(struct work *work, int idx)
+static void do_local_request(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
        struct sd_obj_rsp *rsp = (struct sd_obj_rsp *)&req->rp;
        int ret = SD_RES_SUCCESS;
 
-       dprintf("%d\n", idx);
-
        if (has_process_work(req->op))
                ret = do_process_work(req->op, &req->rq, &req->rp, req->data);
 
@@ -793,20 +791,18 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
        return 0;
 }
 
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
-                uint32_t epoch, int worker_idx)
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch)
 {
-       static int cached_fds[NR_GW_WORKER_THREAD][SD_MAX_NODES];
-       static uint32_t cached_epoch = 0;
-       int i, j, fd, ret;
+       static __thread int cached_fds[SD_MAX_NODES];
+       static __thread uint32_t cached_epoch = 0;
+       int i, fd, ret;
        char name[INET6_ADDRSTRLEN];
 
        if (cached_epoch == 0) {
                /* initialize */
-               for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
-                       for (j = 0; j < SD_MAX_NODES; j++)
-                               cached_fds[i][j] = -1;
-               }
+               for (i = 0; i < SD_MAX_NODES; i++)
+                       cached_fds[i] = -1;
+
                cached_epoch = epoch;
        }
 
@@ -816,18 +812,16 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int 
node_idx,
                return -1;
        }
        if (after(epoch, cached_epoch)) {
-               for (i = 0; i < NR_GW_WORKER_THREAD; i++) {
-                       for (j = 0; j < SD_MAX_NODES; j++) {
-                               if (cached_fds[i][j] >= 0)
-                                       close(cached_fds[i][j]);
+               for (i = 0; i < SD_MAX_NODES; i++) {
+                       if (cached_fds[i] >= 0)
+                               close(cached_fds[i]);
 
-                               cached_fds[i][j] = -1;
-                       }
+                       cached_fds[i] = -1;
                }
                cached_epoch = epoch;
        }
 
-       fd = cached_fds[worker_idx][node_idx];
+       fd = cached_fds[node_idx];
        dprintf("%d, %d\n", epoch, fd);
 
        if (cached_epoch == epoch && fd >= 0) {
@@ -855,7 +849,7 @@ int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
                return -1;
        }
 
-       cached_fds[worker_idx][node_idx] = fd;
+       cached_fds[node_idx] = fd;
 
        return fd;
 }
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 7c61032..556d719 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -215,7 +215,7 @@ int create_cluster(int port, int64_t zone);
 int leave_cluster(void);
 
 void start_cpg_event_work(void);
-void do_io_request(struct work *work, int idx);
+void do_io_request(struct work *work);
 int write_object_local(uint64_t oid, char *data, unsigned int datalen,
                       uint64_t offset, uint16_t flags, int copies,
                       uint32_t epoch, int create);
@@ -224,7 +224,7 @@ int read_object_local(uint64_t oid, char *data, unsigned 
int datalen,
 
 int read_epoch(uint32_t *epoch, uint64_t *ctime,
               struct sheepdog_node_list_entry *entries, int *nr_entries);
-void do_cluster_request(struct work *work, int idx);
+void do_cluster_request(struct work *work);
 
 int update_epoch_store(uint32_t epoch);
 int update_epoch_log(int epoch);
@@ -268,8 +268,7 @@ int remove_object(struct sheepdog_vnode_list_entry *e,
                  int vnodes, int zones, uint32_t node_version,
                  uint64_t oid, int nr);
 
-int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx,
-                uint32_t epoch, int worker_idx);
+int get_sheep_fd(uint8_t *addr, uint16_t port, int node_idx, uint32_t epoch);
 
 /* Operations */
 
diff --git a/sheep/store.c b/sheep/store.c
index 78277a7..c0d7b62 100644
--- a/sheep/store.c
+++ b/sheep/store.c
@@ -214,7 +214,7 @@ out:
 
 static int do_local_io(struct request *req, uint32_t epoch);
 
-static int forward_read_obj_req(struct request *req, int idx)
+static int forward_read_obj_req(struct request *req)
 {
        int i, n, nr, fd, ret;
        unsigned wlen, rlen;
@@ -249,7 +249,7 @@ static int forward_read_obj_req(struct request *req, int 
idx)
 
        n = obj_to_sheep(e, nr, oid, 0);
 
-       fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch, idx);
+       fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, hdr.epoch);
        if (fd < 0) {
                ret = SD_RES_NETWORK_ERROR;
                goto out;
@@ -270,7 +270,7 @@ out:
        return ret;
 }
 
-static int forward_write_obj_req(struct request *req, int idx)
+static int forward_write_obj_req(struct request *req)
 {
        int i, n, nr, fd, ret;
        unsigned wlen;
@@ -314,7 +314,7 @@ static int forward_write_obj_req(struct request *req, int 
idx)
                        continue;
                }
 
-               fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, 
hdr.epoch, idx);
+               fd = get_sheep_fd(e[n].addr, e[n].port, e[n].node_idx, 
hdr.epoch);
                if (fd < 0) {
                        eprintf("failed to connect to %s:%"PRIu32"\n", name, 
e[n].port);
                        ret = SD_RES_NETWORK_ERROR;
@@ -666,7 +666,7 @@ static int do_local_io(struct request *req, uint32_t epoch)
        return ret;
 }
 
-static int fix_object_consistency(struct request *req, int idx)
+static int fix_object_consistency(struct request *req)
 {
        int ret = SD_RES_NO_MEM;
        unsigned int data_length;
@@ -697,7 +697,7 @@ static int fix_object_consistency(struct request *req, int 
idx)
        hdr->opcode = SD_OP_READ_OBJ;
        hdr->flags = 0;
        req->op = get_sd_op(SD_OP_READ_OBJ);
-       ret = forward_read_obj_req(req, idx);
+       ret = forward_read_obj_req(req);
        if (ret != SD_RES_SUCCESS) {
                eprintf("failed to read object %d\n", ret);
                goto out;
@@ -707,7 +707,7 @@ static int fix_object_consistency(struct request *req, int 
idx)
        hdr->flags = SD_FLAG_CMD_WRITE;
        hdr->oid = oid;
        req->op = get_sd_op(SD_OP_WRITE_OBJ);
-       ret = forward_write_obj_req(req, idx);
+       ret = forward_write_obj_req(req);
        if (ret != SD_RES_SUCCESS) {
                eprintf("failed to write object %d\n", ret);
                goto out;
@@ -722,7 +722,7 @@ out:
        return ret;
 }
 
-void do_io_request(struct work *work, int idx)
+void do_io_request(struct work *work)
 {
        struct request *req = container_of(work, struct request, work);
        int ret = SD_RES_SUCCESS;
@@ -732,7 +732,7 @@ void do_io_request(struct work *work, int idx)
        uint32_t opcode = hdr->opcode;
        uint32_t epoch = hdr->epoch;
 
-       dprintf("%"PRIu32", %x, %" PRIx64" , %u\n", idx, opcode, oid, epoch);
+       dprintf("%x, %" PRIx64" , %u\n", opcode, oid, epoch);
 
        if (hdr->flags & SD_FLAG_CMD_RECOVERY)
                epoch = hdr->tgt_epoch;
@@ -742,20 +742,20 @@ void do_io_request(struct work *work, int idx)
        } else {
                /* fix object consistency when we read the object for the first 
time */
                if (req->check_consistency) {
-                       ret = fix_object_consistency(req, idx);
+                       ret = fix_object_consistency(req);
                        if (ret != SD_RES_SUCCESS)
                                goto out;
                }
 
                if (hdr->flags & SD_FLAG_CMD_WRITE)
-                       ret = forward_write_obj_req(req, idx);
+                       ret = forward_write_obj_req(req);
                else
-                       ret = forward_read_obj_req(req, idx);
+                       ret = forward_read_obj_req(req);
        }
 out:
        if (ret != SD_RES_SUCCESS)
-               dprintf("failed: %"PRIu32", %x, %" PRIx64" , %u, %"PRIu32"\n",
-                       idx, opcode, oid, epoch, ret);
+               dprintf("failed: %x, %" PRIx64" , %u, %"PRIu32"\n",
+                       opcode, oid, epoch, ret);
        rsp->result = ret;
 }
 
@@ -1324,7 +1324,7 @@ err:
        return -1;
 }
 
-static void recover_one(struct work *work, int idx)
+static void recover_one(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
        char *buf = NULL;
@@ -1403,8 +1403,6 @@ out:
 
 static struct recovery_work *suspended_recovery_work;
 
-static void __start_recovery(struct work *work, int idx);
-
 static void recover_timer(void *data)
 {
        struct recovery_work *rw = (struct recovery_work *)data;
@@ -1496,7 +1494,7 @@ int is_recoverying_oid(uint64_t oid)
        return 0;
 }
 
-static void recover_done(struct work *work, int idx)
+static void recover_done(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
        uint64_t oid;
@@ -1698,7 +1696,7 @@ fail:
        return -1;
 }
 
-static void __start_recovery(struct work *work, int idx)
+static void __start_recovery(struct work *work)
 {
        struct recovery_work *rw = container_of(work, struct recovery_work, 
work);
        uint32_t epoch = rw->epoch;
diff --git a/sheep/vdi.c b/sheep/vdi.c
index f00ce04..a3b49fc 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -430,7 +430,7 @@ struct deletion_work {
 
 static LIST_HEAD(deletion_work_list);
 
-static void delete_one(struct work *work, int idx)
+static void delete_one(struct work *work)
 {
        struct deletion_work *dw = container_of(work, struct deletion_work, 
work);
        uint32_t vdi_id = *(((uint32_t *)dw->buf) + dw->count - dw->done - 1);
@@ -478,7 +478,7 @@ out:
        free(inode);
 }
 
-static void delete_one_done(struct work *work, int idx)
+static void delete_one_done(struct work *work)
 {
        struct deletion_work *dw = container_of(work, struct deletion_work, 
work);
 
diff --git a/sheep/work.c b/sheep/work.c
index f33b914..789272e 100644
--- a/sheep/work.c
+++ b/sheep/work.c
@@ -182,7 +182,7 @@ static void bs_thread_request_done(int fd, int events, void 
*data)
                         * save its attr for qork_post_done().
                         */
                        attr = work->attr;
-                       work->done(work, 0);
+                       work->done(work);
                        work_post_done(&wi->q, attr);
                }
        }
@@ -192,18 +192,10 @@ static void *worker_routine(void *arg)
 {
        struct worker_info *wi = arg;
        struct work *work;
-       int i, idx = 0;
        eventfd_t value = 1;
 
-       for (i = 0; i < wi->nr_threads; i++) {
-               if (wi->worker_thread[i] == pthread_self()) {
-                       idx = i;
-                       break;
-               }
-       }
-
        pthread_mutex_lock(&wi->startup_lock);
-       dprintf("started this thread %d\n", idx);
+       /* started this thread */
        pthread_mutex_unlock(&wi->startup_lock);
 
        while (!(wi->q.wq_state & WQ_DEAD)) {
@@ -225,7 +217,7 @@ retest:
                list_del(&work->w_list);
                pthread_mutex_unlock(&wi->pending_lock);
 
-               work->fn(work, idx);
+               work->fn(work);
 
                pthread_mutex_lock(&wi->finished_lock);
                list_add_tail(&work->w_list, &wi->finished_list);
diff --git a/sheep/work.h b/sheep/work.h
index a980600..9ef9936 100644
--- a/sheep/work.h
+++ b/sheep/work.h
@@ -4,7 +4,7 @@
 struct work;
 struct work_queue;
 
-typedef void (*work_func_t)(struct work *, int idx);
+typedef void (*work_func_t)(struct work *);
 
 enum work_attr {
        WORK_SIMPLE,
-- 
1.7.2.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to