Implement the distributed lock by zookeeper (refer: 
http://zookeeper.apache.org/doc/trunk/recipes.html)
The routine is:
        1. create a seq-ephemeral znode in lock directory (use lock-id as dir 
name)
        2. get smallest file path as owner of the lock; the other thread wait 
on a pthread_mutex_t
        3. if owner of the lock release it (or the owner is killed by 
accident), zookeeper will
           trigger zk_watch() which will wake up all waiting threads to compete 
new owner of the lock

We add ->local_lock for dist_mutex to avoid many threads in one sheepdog daemon 
to create too many files
in a lock directory.

v2 --> v3:
        1. change cluster interface to init_lock()/lock()/unlock()
        2. change 'struct zk_mutex' to 'struct cluster_lock'
        3. add empty implementation to local/corosync

v1 --> v2:
        move code from sheep/http/lock.c into sheep/cluster/zookeeper.c using 
cluster framework

Signed-off-by: Robin Dong <[email protected]>
---
 sheep/cluster.h           |   25 ++++++++
 sheep/cluster/corosync.c  |   16 +++++
 sheep/cluster/local.c     |   16 +++++
 sheep/cluster/zookeeper.c |  140 ++++++++++++++++++++++++++++++++++++++++++++-
 4 files changed, 196 insertions(+), 1 deletions(-)

diff --git a/sheep/cluster.h b/sheep/cluster.h
index 81b5ae4..260573c 100644
--- a/sheep/cluster.h
+++ b/sheep/cluster.h
@@ -23,6 +23,13 @@
 #include "sheep.h"
 #include "config.h"
 
+struct cluster_lock {
+       uint32_t id;            /* id of this mutex */
+       pthread_mutex_t wait;
+       pthread_mutex_t local_lock;
+       char ephemeral_path[MAX_NODE_STR_LEN];
+};
+
 /*
  * maximum payload size sent in ->notify and ->unblock, it should be large
  * enough to support COROSYNC_MAX_NODES * struct sd_node
@@ -109,6 +116,24 @@ struct cluster_driver {
        int (*unblock)(void *msg, size_t msg_len);
 
        /*
+        * Init a distributed mutually exclusive lock to avoid race condition
+        * when using swift interface to add/delete/list object.
+        *
+        * Returns SD_RES_XXX
+        */
+       int (*init_lock)(struct cluster_lock *lock, uint32_t id);
+
+       /*
+        * Get the distributed lock.
+        *
+        * It will never return if it can't acquire the lock.
+        */
+       void (*lock)(struct cluster_lock *lock);
+
+       /* Release the distributed lock. */
+       void (*unlock)(struct cluster_lock *lock);
+
+       /*
         * Update the specific node in the driver's private copy of nodes
         *
         * Returns SD_RES_XXX
diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
index ea4421b..35c95d3 100644
--- a/sheep/cluster/corosync.c
+++ b/sheep/cluster/corosync.c
@@ -774,6 +774,19 @@ again:
        return 0;
 }
 
+static int corosync_init_lock(struct cluster_lock *dlock, uint32_t id)
+{
+       return -1;
+}
+
+static void corosync_lock(struct cluster_lock *dlock)
+{
+}
+
+static void corosync_unlock(struct cluster_lock *dlock)
+{
+}
+
 static int corosync_update_node(struct sd_node *node)
 {
        struct cpg_node cnode = this_node;
@@ -794,6 +807,9 @@ static struct cluster_driver cdrv_corosync = {
        .notify         = corosync_notify,
        .block          = corosync_block,
        .unblock        = corosync_unblock,
+       .init_lock      = corosync_init_lock,
+       .lock           = corosync_lock,
+       .unlock         = corosync_unlock,
        .update_node    = corosync_update_node,
 };
 
diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
index b8cbb5c..d71cd20 100644
--- a/sheep/cluster/local.c
+++ b/sheep/cluster/local.c
@@ -547,6 +547,19 @@ static int local_init(const char *option)
        return 0;
 }
 
+static int local_init_lock(struct cluster_lock *dlock, uint32_t id)
+{
+       return -1;
+}
+
+static void local_lock(struct cluster_lock *dlock)
+{
+}
+
+static void local_unlock(struct cluster_lock *dlock)
+{
+}
+
 static int local_update_node(struct sd_node *node)
 {
        struct local_node lnode = this_node;
@@ -566,6 +579,9 @@ static struct cluster_driver cdrv_local = {
        .notify         = local_notify,
        .block          = local_block,
        .unblock        = local_unblock,
+       .init_lock      = local_init_lock,
+       .lock           = local_lock,
+       .unlock         = local_unlock,
        .update_node    = local_update_node,
 };
 
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index fa89c46..fc6c207 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -30,6 +30,21 @@
 #define QUEUE_ZNODE BASE_ZNODE "/queue"
 #define MEMBER_ZNODE BASE_ZNODE "/member"
 #define MASTER_ZNONE BASE_ZNODE "/master"
+#define LOCK_ZNODE BASE_ZNODE "/lock"
+
+#define MAX_MUTEX_NR   4096
+#define WAIT_TIME      1               /* second */
+
+static struct cluster_lock **dlock_array;
+
+/*
+ * Wait a while when create, delete or get_children fail on
+ * zookeeper lock so it will not print too much loop log
+ */
+static void zk_wait(void)
+{
+       sleep(WAIT_TIME);
+}
 
 /* iterate child znodes */
 #define FOR_EACH_ZNODE(parent, path, strs)                            \
@@ -506,6 +521,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
 {
        struct zk_node znode;
        char str[MAX_NODE_STR_LEN], *p;
+       uint32_t lock_id;
        int ret;
 
        if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
@@ -528,6 +544,14 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
        } else if (type == ZOO_DELETED_EVENT) {
                struct zk_node *n;
 
+               /* process distributed lock */
+               ret = sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str);
+               if (ret == 2 && lock_id < MAX_MUTEX_NR &&
+                   dlock_array && dlock_array[lock_id]) {
+                       pthread_mutex_unlock(&(dlock_array[lock_id]->wait));
+                       sd_debug("release lock %u %s", lock_id, str);
+               }
+
                ret = sscanf(path, MASTER_ZNONE "/%s", str);
                if (ret == 1) {
                        zk_compete_master();
@@ -1058,6 +1082,108 @@ kick_block_event:
        kick_block_event();
 }
 
+static int zk_init_lock(struct cluster_lock *dlock, uint32_t id)
+{
+       int rc;
+       char path[MAX_NODE_STR_LEN];
+
+       if (id > MAX_MUTEX_NR) {
+               sd_err("lock-id is too large!");
+               rc = -1;
+               goto err;
+       }
+
+       dlock_array[id] = dlock;
+       dlock->id = id;
+       snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", dlock->id);
+       rc = zk_init_node(path);
+       if (rc)
+               goto err;
+
+       rc = pthread_mutex_init(&dlock->wait, NULL);
+       if (rc) {
+               sd_err("failed to init dlock->wait");
+               goto err;
+       }
+
+       rc = pthread_mutex_init(&dlock->local_lock, NULL);
+       if (rc) {
+               sd_err("failed to init dlock->local_lock");
+               goto err;
+       }
+
+       return 0;
+err:
+       dlock_array[id] = NULL;
+       return rc;
+}
+
+static void zk_lock(struct cluster_lock *dlock)
+{
+       int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL;
+       int rc, len = MAX_NODE_STR_LEN;
+       char *my_path;
+       char parent[MAX_NODE_STR_LEN];
+       char lowest_seq_path[MAX_NODE_STR_LEN];
+       char owner_name[MAX_NODE_STR_LEN];
+
+       /*
+        * if many threads use locks with same id, we should use
+        * ->local_lock to avoid the only zookeeper handler to
+        * create many seq-ephemeral files.
+        */
+       pthread_mutex_lock(&dlock->local_lock);
+
+       my_path = dlock->ephemeral_path;
+
+       /* compete owner of lock is just like zk_compete_master() */
+       snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", dlock->id);
+       while (true) {
+               rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE,
+                               flags, my_path, MAX_NODE_STR_LEN);
+               if (rc == ZOK)
+                       break;
+               sd_err("failed to create path:%s, %s", my_path, zerror(rc));
+               zk_wait();
+       }
+       sd_debug("create path %s success", my_path);
+
+       /* create node ok now */
+       snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", dlock->id);
+       while (true) {
+               zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN,
+                                owner_name, &len);
+
+               /* I got the lock */
+               if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) {
+                       sd_debug("I am master now. %s", lowest_seq_path);
+                       return;
+               }
+
+               /* I failed to get the lock */
+               rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL);
+               if (rc == ZOK) {
+                       sd_debug("call zoo_exits success %s", lowest_seq_path);
+                       pthread_mutex_lock(&dlock->wait);
+               } else {
+                       sd_err("failed to call zoo_exists %s", zerror(rc));
+                       if (rc != ZNONODE)
+                               zk_wait();
+               }
+       }
+}
+
+static void zk_unlock(struct cluster_lock *dlock)
+{
+       int rc;
+       rc = zk_delete_node(dlock->ephemeral_path, -1);
+       if (rc != ZOK)
+               sd_err("Failed to delete path: %s %s", dlock->ephemeral_path,
+                      zerror(rc));
+       pthread_mutex_unlock(&dlock->local_lock);
+}
+
+
 static int zk_init(const char *option)
 {
        char *hosts, *to, *p;
@@ -1102,6 +1228,15 @@ static int zk_init(const char *option)
                return -1;
        }
 
+       /* init distributed lock */
+       dlock_array = xzalloc(sizeof(struct cluster_lock *) * MAX_MUTEX_NR);
+
+       ret = zk_init_node(LOCK_ZNODE);
+       if (ret != ZOK) {
+               sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+               free(dlock_array);
+               return -1;
+       }
        return 0;
 }
 
@@ -1122,7 +1257,10 @@ static struct cluster_driver cdrv_zookeeper = {
        .notify     = zk_notify,
        .block      = zk_block,
        .unblock    = zk_unblock,
-       .update_node = zk_update_node,
+       .init_lock  = zk_init_lock,
+       .lock       = zk_lock,
+       .unlock     = zk_unlock,
+       .update_node  = zk_update_node,
        .get_local_addr = get_local_addr,
 };
 
-- 
1.7.1

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to