On Sun, Dec 01, 2013 at 03:48:04PM +0800, Robin Dong wrote:
> Implement the distributed lock by zookeeper (refer: 
> http://zookeeper.apache.org/doc/trunk/recipes.html)
> The routine is:
>         1. create a seq-ephemeral znode in lock directory (use lock-id as dir 
> name)
>         2. get smallest file path as owner of the lock; the other thread wait 
> on a pthread_mutex_t (cluster_lock->wait)
>         3. if owner of the lock release it (or the owner is killed by 
> accident), zookeeper will
>            trigger zk_watch() which will wake up all waiting threads to 
> compete new owner of the lock
> 
> We use dlock_array to store pointers of cluster_locks in this sheep daemon so 
> when receiving the event of ZOO_DELETED_EVENT
> the program will wake up all waiters (in this sheep daemon) who is sleeping 
> on the lock id and let them compete for new
> owner.
> dlock_array is just a normal array using lock-id as index, so imaging a 
> scenario: two threads (A and B) in one sheep daemon
> call zk_lock() for same lock-id, they will create two znodes in zookeeper but 
> set dlock_array[lock_id] to only one of
> them (for example, set to B). After that, when ZOO_DELETED_EVENT comes, the 
> zk_waiter() will only wake up thread B and thread A
> will sleep on '->wait' forever becuase no one could wakeup him.
> We have two method to solve this problem:
>       A. using more complicated structure instead of dlock_array to store 
> both A and B 's lock handle.
>       B. adding a lock to avoid A and B call zk_lock() in the same time.
> We prefer method B because it also avoid creating too many files in a 
> directory of zookeeper which will take too much pressure
> on zookeeper server if the number of sheep deamons are huge. Therefore we add 
> 'local_lock' in 'struct cluster_lock'.
> 
> Signed-off-by: Robin Dong <[email protected]>
> ---
>  include/sheep.h           |    8 +++
>  sheep/cluster.h           |   34 +++++++++++
>  sheep/cluster/corosync.c  |   16 +++++
>  sheep/cluster/local.c     |   16 +++++
>  sheep/cluster/zookeeper.c |  140 
> ++++++++++++++++++++++++++++++++++++++++++++-
>  5 files changed, 213 insertions(+), 1 deletions(-)
> 
> diff --git a/include/sheep.h b/include/sheep.h
> index 293e057..fd7258b 100644
> --- a/include/sheep.h
> +++ b/include/sheep.h
> @@ -255,6 +255,14 @@ static inline void nodes_to_buffer(struct rb_root 
> *nroot, void *buffer)
>  
>  #define MAX_NODE_STR_LEN 256
>  
> +/* structure for distributed lock */
> +struct cluster_lock {
> +     uint64_t id;            /* id of this mutex */
> +     pthread_mutex_t wait;
> +     pthread_mutex_t local_lock;
> +     char ephemeral_path[MAX_NODE_STR_LEN];
> +};
> +
>  static inline const char *node_to_str(const struct sd_node *id)
>  {
>       static __thread char str[MAX_NODE_STR_LEN];
> diff --git a/sheep/cluster.h b/sheep/cluster.h
> index 81b5ae4..f0950ac 100644
> --- a/sheep/cluster.h
> +++ b/sheep/cluster.h
> @@ -109,6 +109,40 @@ struct cluster_driver {
>       int (*unblock)(void *msg, size_t msg_len);
>  
>       /*
> +      * Init a distributed mutually exclusive lock to avoid race condition
> +      * when the whole sheepdog cluster process one exclusive resource.
> +      *
> +      * This function use 'lock_id' as the id of this distributed lock.
> +      * A thread can create many locks in one sheep daemon.
> +      *
> +      * Returns SD_RES_XXX
> +      */
> +     int (*init_lock)(struct cluster_lock *lock, uint64_t lock_id);
> +
> +     /*
> +      * Acquire the distributed lock.
> +      *
> +      * The cluster_lock referenced by 'lock' shall be locked by calling
> +      * cluster->lock(). If the cluster_lock is already locked, the calling
> +      * thread shall block until the cluster_lock becomes available.
> +      *
> +      * This operation will create a seq-ephemeral znode in lock directory
> +      * of zookeeper (use lock-id as dir name). The smallest file path in
> +      * this directory wil be the owner of the lock; the other threads will
> +      * wait on a pthread_mutex_t (cluster_lock->wait)
> +      */
>

The second paragraph should be moved to zookeeper.c since it is only zk related.

>
> +     void (*lock)(struct cluster_lock *lock);
> +
> +     /*
> +      * Release the distributed lock.
> +      *
> +      * If the owner of the cluster_lock release it (or the owner is
> +      * killed by accident), zookeeper will trigger zk_watch() which will
> +      * wake up all waiting threads to compete new owner of the lock
> +      */
> +     void (*unlock)(struct cluster_lock *lock);
> +
> +     /*
>        * Update the specific node in the driver's private copy of nodes
>        *
>        * Returns SD_RES_XXX
> diff --git a/sheep/cluster/corosync.c b/sheep/cluster/corosync.c
> index ea4421b..35c95d3 100644
> --- a/sheep/cluster/corosync.c
> +++ b/sheep/cluster/corosync.c
> @@ -774,6 +774,19 @@ again:
>       return 0;
>  }
>  
> +static int corosync_init_lock(struct cluster_lock *dlock, uint32_t id)
> +{
> +     return -1;
> +}
> +
> +static void corosync_lock(struct cluster_lock *dlock)
> +{
> +}
> +
> +static void corosync_unlock(struct cluster_lock *dlock)
> +{
> +}
> +
>  static int corosync_update_node(struct sd_node *node)
>  {
>       struct cpg_node cnode = this_node;
> @@ -794,6 +807,9 @@ static struct cluster_driver cdrv_corosync = {
>       .notify         = corosync_notify,
>       .block          = corosync_block,
>       .unblock        = corosync_unblock,
> +     .init_lock      = corosync_init_lock,
> +     .lock           = corosync_lock,
> +     .unlock         = corosync_unlock,
>       .update_node    = corosync_update_node,
>  };
>  
> diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c
> index b8cbb5c..69d9d6f 100644
> --- a/sheep/cluster/local.c
> +++ b/sheep/cluster/local.c
> @@ -547,6 +547,19 @@ static int local_init(const char *option)
>       return 0;
>  }
>  
> +static int local_init_lock(struct cluster_lock *dlock, uint64_t id)
> +{
> +     return -1;
> +}
> +
> +static void local_lock(struct cluster_lock *dlock)
> +{
> +}
> +
> +static void local_unlock(struct cluster_lock *dlock)
> +{
> +}
> +
>  static int local_update_node(struct sd_node *node)
>  {
>       struct local_node lnode = this_node;
> @@ -566,6 +579,9 @@ static struct cluster_driver cdrv_local = {
>       .notify         = local_notify,
>       .block          = local_block,
>       .unblock        = local_unblock,
> +     .init_lock      = local_init_lock,
> +     .lock           = local_lock,
> +     .unlock         = local_unlock,
>       .update_node    = local_update_node,
>  };
>  
> diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
> index fa89c46..01cb37c 100644
> --- a/sheep/cluster/zookeeper.c
> +++ b/sheep/cluster/zookeeper.c
> @@ -30,6 +30,21 @@
>  #define QUEUE_ZNODE BASE_ZNODE "/queue"
>  #define MEMBER_ZNODE BASE_ZNODE "/member"
>  #define MASTER_ZNONE BASE_ZNODE "/master"
> +#define LOCK_ZNODE BASE_ZNODE "/lock"
> +
> +#define MAX_MUTEX_NR 4096
> +#define WAIT_TIME    1               /* second */
> +
> +static struct cluster_lock **dlock_array;
> +
> +/*
> + * Wait a while when create, delete or get_children fail on
> + * zookeeper lock so it will not print too much loop log
> + */
> +static void zk_wait(void)
> +{
> +     sleep(WAIT_TIME);
> +}
>  
>  /* iterate child znodes */
>  #define FOR_EACH_ZNODE(parent, path, strs)                          \
> @@ -506,6 +521,7 @@ static void zk_watcher(zhandle_t *zh, int type, int 
> state, const char *path,
>  {
>       struct zk_node znode;
>       char str[MAX_NODE_STR_LEN], *p;
> +     uint64_t lock_id;
>       int ret;
>  
>       if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
> @@ -528,6 +544,14 @@ static void zk_watcher(zhandle_t *zh, int type, int 
> state, const char *path,
>       } else if (type == ZOO_DELETED_EVENT) {
>               struct zk_node *n;
>  
> +             /* process distributed lock */
> +             ret = sscanf(path, LOCK_ZNODE "/%lu/%s", &lock_id, str);
> +             if (ret == 2 && lock_id < MAX_MUTEX_NR &&
> +                 dlock_array && dlock_array[lock_id]) {
> +                     pthread_mutex_unlock(&(dlock_array[lock_id]->wait));
> +                     sd_debug("release lock %lu %s", lock_id, str);
> +             }
> +
>               ret = sscanf(path, MASTER_ZNONE "/%s", str);
>               if (ret == 1) {
>                       zk_compete_master();
> @@ -1058,6 +1082,108 @@ kick_block_event:
>       kick_block_event();
>  }
>  
> +static int zk_init_lock(struct cluster_lock *dlock, uint64_t lock_id)

rename dlock as clock for better uniformity.

> +{
> +     int rc = 0;
> +     char path[MAX_NODE_STR_LEN];
> +
> +     if (lock_id > MAX_MUTEX_NR) {

If we constraint lock_id into [0, MAX_MUTEX_NR] then uint64_t is meaningless.
IIUC, lock_id is the unique identifier represent which lock is under operation. 
This
means we can use vid + number to represent which lock the code is trying to 
hold.
But with above check, apparently vid + number > MAX_MUTEX_NR as always, no? So
it seems that different clients can't simply use 'vid+xxx' to represent a lock
without extra communication.

Thanks
Yuan
-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to