Implement the distributed lock by zookeeper (refer: http://zookeeper.apache.org/doc/trunk/recipes.html) The routine is: 1. create a seq-ephemeral znode in lock directory (use lock-id as dir name) 2. get smallest file path as owner of the lock; the other thread wait on a pthread_mutex_t 3. if owner of the lock release it (or the owner is killed by accident), zookeeper will trigger zk_watch() which will wake up all waiting threads to compete new owner of the lock
We add ->local_lock for dist_mutex to avoid many threads in one sheepdog daemon to create too many files in a lock directory. Signed-off-by: Robin Dong <[email protected]> --- sheep/cluster.h | 15 +++++ sheep/cluster/zookeeper.c | 140 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 154 insertions(+), 1 deletions(-) diff --git a/sheep/cluster.h b/sheep/cluster.h index 81b5ae4..0aa058c 100644 --- a/sheep/cluster.h +++ b/sheep/cluster.h @@ -23,6 +23,13 @@ #include "sheep.h" #include "config.h" +struct dist_mutex { + uint32_t id; /* id of this mutex */ + pthread_mutex_t wait; + pthread_mutex_t local_lock; + char ephemeral_path[MAX_NODE_STR_LEN]; +}; + /* * maximum payload size sent in ->notify and ->unblock, it should be large * enough to support COROSYNC_MAX_NODES * struct sd_node @@ -109,6 +116,14 @@ struct cluster_driver { int (*unblock)(void *msg, size_t msg_len); /* + * A distributed mutex lock to avoid race condition when using swift + * interface to add/delete/list object. + */ + int (*init_mutex)(struct dist_mutex *mutex, uint32_t id); + void (*lock_mutex)(struct dist_mutex *mutex); + void (*unlock_mutex)(struct dist_mutex *mutex); + + /* * Update the specific node in the driver's private copy of nodes * * Returns SD_RES_XXX diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index fa89c46..ddf6a22 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -30,6 +30,21 @@ #define QUEUE_ZNODE BASE_ZNODE "/queue" #define MEMBER_ZNODE BASE_ZNODE "/member" #define MASTER_ZNONE BASE_ZNODE "/master" +#define LOCK_ZNODE BASE_ZNODE "/lock" + +#define MAX_MUTEX_NR 4096 +#define WAIT_TIME 1 /* second */ + +static struct dist_mutex **mutex_array; + +/* + * Wait a while when create, delete or get_children fail on + * zookeeper lock so it will not print too much loop log + */ +static void zk_wait(void) +{ + sleep(WAIT_TIME); +} /* iterate child znodes */ #define FOR_EACH_ZNODE(parent, path, strs) \ @@ -506,6 +521,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, { struct zk_node znode; char str[MAX_NODE_STR_LEN], *p; + uint32_t lock_id; int ret; if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) { @@ -528,6 +544,14 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, } else if (type == ZOO_DELETED_EVENT) { struct zk_node *n; + /* process distributed lock */ + ret = sscanf(path, LOCK_ZNODE "/%u/%s", &lock_id, str); + if (ret == 2 && lock_id < MAX_MUTEX_NR && + mutex_array && mutex_array[lock_id]) { + pthread_mutex_unlock(&(mutex_array[lock_id]->wait)); + sd_debug("release lock %u %s", lock_id, str); + } + ret = sscanf(path, MASTER_ZNONE "/%s", str); if (ret == 1) { zk_compete_master(); @@ -1058,6 +1082,108 @@ kick_block_event: kick_block_event(); } +static int zk_init_mutex(struct dist_mutex *mutex, uint32_t id) +{ + int rc; + char path[MAX_NODE_STR_LEN]; + + if (id > MAX_MUTEX_NR) { + sd_err("lock-id is too large!"); + rc = -1; + goto err; + } + + mutex_array[id] = mutex; + mutex->id = id; + snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id); + rc = zk_init_node(path); + if (rc) + goto err; + + rc = pthread_mutex_init(&mutex->wait, NULL); + if (rc) { + sd_err("failed to init mutex->wait"); + goto err; + } + + rc = pthread_mutex_init(&mutex->local_lock, NULL); + if (rc) { + sd_err("failed to init mutex->local_lock"); + goto err; + } + + return 0; +err: + mutex_array[id] = NULL; + return rc; +} + +static void zk_lock_mutex(struct dist_mutex *mutex) +{ + int flags = ZOO_SEQUENCE | ZOO_EPHEMERAL; + int rc, len = MAX_NODE_STR_LEN; + char *my_path; + char parent[MAX_NODE_STR_LEN]; + char lowest_seq_path[MAX_NODE_STR_LEN]; + char owner_name[MAX_NODE_STR_LEN]; + + /* + * if many threads use locks with same id, we should use + * ->local_lock to avoid the only zookeeper handler to + * create many seq-ephemeral files. + */ + pthread_mutex_lock(&mutex->local_lock); + + my_path = mutex->ephemeral_path; + + /* compete owner of lock is just like zk_compete_master() */ + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u/", mutex->id); + while (true) { + rc = zoo_create(zhandle, parent, "", 0, &ZOO_OPEN_ACL_UNSAFE, + flags, my_path, MAX_NODE_STR_LEN); + if (rc == ZOK) + break; + sd_err("failed to create path:%s, %s", my_path, zerror(rc)); + zk_wait(); + } + sd_debug("create path %s success", my_path); + + /* create node ok now */ + snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%u", mutex->id); + while (true) { + zk_get_least_seq(parent, lowest_seq_path, MAX_NODE_STR_LEN, + owner_name, &len); + + /* I got the lock */ + if (!strncmp(lowest_seq_path, my_path, strlen(my_path))) { + sd_debug("I am master now. %s", lowest_seq_path); + return; + } + + /* I failed to get the lock */ + rc = zoo_exists(zhandle, lowest_seq_path, 1, NULL); + if (rc == ZOK) { + sd_debug("call zoo_exits success %s", lowest_seq_path); + pthread_mutex_lock(&mutex->wait); + } else { + sd_err("failed to call zoo_exists %s", zerror(rc)); + if (rc != ZNONODE) + zk_wait(); + } + } +} + +static void zk_unlock_mutex(struct dist_mutex *mutex) +{ + int rc; + rc = zk_delete_node(mutex->ephemeral_path, -1); + if (rc != ZOK) + sd_err("Failed to delete path: %s %s", mutex->ephemeral_path, + zerror(rc)); + pthread_mutex_unlock(&mutex->local_lock); +} + + static int zk_init(const char *option) { char *hosts, *to, *p; @@ -1102,6 +1228,15 @@ static int zk_init(const char *option) return -1; } + /* init distributed mutex lock */ + mutex_array = xzalloc(sizeof(struct zk_mutex *) * MAX_MUTEX_NR); + + ret = zk_init_node(LOCK_ZNODE); + if (ret != ZOK) { + sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret)); + free(mutex_array); + return -1; + } return 0; } @@ -1122,7 +1257,10 @@ static struct cluster_driver cdrv_zookeeper = { .notify = zk_notify, .block = zk_block, .unblock = zk_unblock, - .update_node = zk_update_node, + .init_mutex = zk_init_mutex, + .lock_mutex = zk_lock_mutex, + .unlock_mutex = zk_unlock_mutex, + .update_node = zk_update_node, .get_local_addr = get_local_addr, }; -- 1.7.1 -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
