Signed-off-by: Yu Yang <yuy...@cmss.chinamobile.com> --- sheep/cluster/zookeeper.c | 197 +++++++++++++++++++++++++++++++-------------- sheep/sheep.c | 8 +- 2 files changed, 142 insertions(+), 63 deletions(-)
diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c index 3248af2..87d28bd 100644 --- a/sheep/cluster/zookeeper.c +++ b/sheep/cluster/zookeeper.c @@ -26,15 +26,22 @@ #include "rbtree.h" #define SESSION_TIMEOUT 30000 /* millisecond */ +#define SD_DEFAULT_DOMAIN "sd_domain_default" + +static char base_znode[] = "/sheepdog"; +static char queue_znode[MAX_NODE_STR_LEN] = ""; +static char queue_znode_post[] = "/queue"; +static char member_znode[MAX_NODE_STR_LEN] = ""; +static char member_znode_post[] = "/member"; +static char master_znode[MAX_NODE_STR_LEN] = ""; +static char master_znode_post[] = "/master"; +static char lock_znode[MAX_NODE_STR_LEN] = ""; +static char lock_znode_post[] = "/lock"; -#define BASE_ZNODE "/sheepdog" -#define QUEUE_ZNODE BASE_ZNODE "/queue" -#define MEMBER_ZNODE BASE_ZNODE "/member" -#define MASTER_ZNODE BASE_ZNODE "/master" -#define LOCK_ZNODE BASE_ZNODE "/lock" static int zk_timeout = SESSION_TIMEOUT; static int my_master_seq; +static char sd_domain[MAX_NODE_STR_LEN] = SD_DEFAULT_DOMAIN; /* structure for distributed lock */ struct cluster_lock { @@ -347,7 +354,7 @@ static struct cluster_lock *lock_table_lookup_acquire(uint64_t lock_id) ret_lock = xzalloc(sizeof(*ret_lock)); ret_lock->id = lock_id; ret_lock->ref = 1; - snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64, + snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,lock_znode, ret_lock->id); rc = zk_init_node(path); if (rc) @@ -399,8 +406,8 @@ static void lock_table_lookup_release(uint64_t lock_id) /* free all resource used by this lock */ sd_destroy_mutex(&lock->id_lock); sem_destroy(&lock->wait_wakeup); - snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64, - lock->id); + snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64, + lock_znode, lock->id); /* * If deletion of directory 'lock_id' fail, we only get * a * empty directory in zookeeper. That's unharmful @@ -458,7 +465,7 @@ static int zk_queue_peek(bool *peek) int rc; char path[MAX_NODE_STR_LEN]; - snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos); + snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos); rc = zk_node_exists(path); switch (rc) { @@ -483,7 +490,7 @@ static int zk_find_seq_node(uint64_t id, char *seq_path, int seq_path_len, for (int seq = queue_pos; ; seq++) { struct zk_event ev; - snprintf(seq_path, seq_path_len, QUEUE_ZNODE"/%010"PRId32, seq); + snprintf(seq_path, seq_path_len, "%s/%010"PRId32, queue_znode, seq); len = offsetof(typeof(ev), id) + sizeof(ev.id); rc = zk_get_data(seq_path, &ev, &len); switch (rc) { @@ -513,7 +520,7 @@ static int zk_queue_push(struct zk_event *ev) bool found; len = offsetof(typeof(*ev), buf) + ev->buf_len; - snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE); + snprintf(path, sizeof(path), "%s/", queue_znode); again: rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf), false); switch (rc) { @@ -536,8 +543,11 @@ again: } if (first_push) { int32_t seq; - - sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq); + + char temp_char_form[MAX_NODE_STR_LEN] = ""; + strcpy(temp_char_form,queue_znode); + strcat(temp_char_form,"/%"PRId32); + sscanf(buf, temp_char_form, &seq); queue_pos = seq; eventfd_xwrite(efd, 1); first_push = false; @@ -568,7 +578,7 @@ static int push_join_response(struct zk_event *ev) queue_pos--; len = offsetof(typeof(*ev), buf) + ev->buf_len; - snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos); + snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos); RETURN_IF_ERROR(zk_set_data(path, (char *)ev, len, -1), ""); sd_debug("update path:%s, queue_pos:%010" PRId32 ", len:%d", path, @@ -582,7 +592,7 @@ static int zk_queue_pop_advance(struct zk_event *ev) char path[MAX_NODE_STR_LEN]; len = sizeof(*ev); - snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos); + snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos); RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path); sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len, @@ -641,10 +651,15 @@ static inline void build_node_list(void) static int zk_queue_init(void) { - RETURN_IF_ERROR(zk_init_node(BASE_ZNODE), "path %s", BASE_ZNODE); - RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE); - RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE); - RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE); + char sd_domain_znode[128] = ""; + strcpy(sd_domain_znode, base_znode); + strcat(sd_domain_znode, "/"); + strcat(sd_domain_znode, sd_domain); + RETURN_IF_ERROR(zk_init_node(base_znode), "path %s", base_znode); + RETURN_IF_ERROR(zk_init_node(sd_domain_znode), "path %s", sd_domain_znode); + RETURN_IF_ERROR(zk_init_node(master_znode), "path %s", master_znode); + RETURN_IF_ERROR(zk_init_node(queue_znode), "path %s", queue_znode); + RETURN_IF_ERROR(zk_init_node(member_znode), "path %s", member_znode); return ZOK; } @@ -692,6 +707,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, char str[MAX_NODE_STR_LEN], *p; uint64_t lock_id; int ret; + char temp_char_form[MAX_NODE_STR_LEN] = ""; if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) { /* @@ -705,7 +721,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */ sd_debug("path:%s, type:%d, state:%d", path, type, state); if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) { - ret = sscanf(path, MEMBER_ZNODE "/%s", str); + strcpy(temp_char_form,member_znode); + strcat(temp_char_form,"/%s"); + ret = sscanf(path, temp_char_form, str); if (ret == 1) zk_node_exists(path); /* kick off the event handler */ @@ -714,7 +732,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, struct zk_node *n; /* process distributed lock */ - ret = sscanf(path, LOCK_ZNODE "/%"PRIu64"/%s", &lock_id, str); + strcpy(temp_char_form,lock_znode); + strcat(temp_char_form,"/%"PRIu64"/%s"); + ret = sscanf(path, temp_char_form, &lock_id, str); if (ret == 2) { ret = lock_table_lookup_wakeup(lock_id); if (ret) @@ -722,14 +742,18 @@ static void zk_watcher(zhandle_t *zh, int type, int state, const char *path, lock_id, str); return; } - - ret = sscanf(path, MASTER_ZNODE "/%s", str); + + strcpy(temp_char_form,master_znode); + strcat(temp_char_form,"/%s"); + ret = sscanf(path, temp_char_form, str); if (ret == 1) { zk_compete_master(); return; } - ret = sscanf(path, MEMBER_ZNODE "/%s", str); + strcpy(temp_char_form,member_znode); + strcat(temp_char_form,"/%s"); + ret = sscanf(path, temp_char_form, str); if (ret != 1) return; p = strrchr(path, '/'); @@ -815,19 +839,23 @@ static int zk_find_master(int *master_seq, char *master_name) { int rc, len = MAX_NODE_STR_LEN; char master_compete_path[MAX_NODE_STR_LEN]; + char temp_char_form[MAX_NODE_STR_LEN] = ""; if (*master_seq < 0) { - RETURN_IF_ERROR(zk_get_least_seq(MASTER_ZNODE, + RETURN_IF_ERROR(zk_get_least_seq(master_znode, master_compete_path, MAX_NODE_STR_LEN, master_name, &len), ""); - sscanf(master_compete_path, MASTER_ZNODE "/%"PRId32, - master_seq); + strcpy(temp_char_form,master_znode); + strcat(temp_char_form,"/%"PRId32); + sscanf(master_compete_path, temp_char_form, master_seq); return ZOK; } else { while (true) { + strcpy(temp_char_form,master_znode); + strcat(temp_char_form,"/%010"PRId32); snprintf(master_compete_path, len, - MASTER_ZNODE "/%010"PRId32, *master_seq); + temp_char_form, *master_seq); rc = zk_get_data(master_compete_path, master_name, &len); switch (rc) { @@ -854,10 +882,12 @@ static int zk_verify_last_sheep_join(int seq, int *last_sheep) { int rc, len = MAX_NODE_STR_LEN; char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN]; + char temp_char_form[MAX_NODE_STR_LEN] = ""; for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) { - snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNODE "/%010"PRId32, - *last_sheep); + strcpy(temp_char_form, master_znode); + strcat(temp_char_form, "/%010"PRId32 ); + snprintf(path, MAX_NODE_STR_LEN, temp_char_form, *last_sheep); rc = zk_get_data(path, name, &len); switch (rc) { case ZNONODE: @@ -871,8 +901,9 @@ static int zk_verify_last_sheep_join(int seq, int *last_sheep) if (!strcmp(name, node_to_str(&this_node.node))) continue; - - snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name); + strcpy(temp_char_form, member_znode); + strcat(temp_char_form, "/%s"); + snprintf(path, MAX_NODE_STR_LEN, temp_char_form, name); rc = zk_node_exists(path); switch (rc) { case ZOK: @@ -898,7 +929,7 @@ static void zk_compete_master(void) char master_name[MAX_NODE_STR_LEN]; char my_compete_path[MAX_NODE_STR_LEN]; static int master_seq = -1, my_seq; - + /* * This is to protect master_seq and my_seq because this function will * be called by both main thread and zookeeper's event thread. @@ -909,23 +940,28 @@ static void zk_compete_master(void) goto out_unlock; if (!joined) { + char temp_char_form[MAX_NODE_STR_LEN] = ""; + strcpy(temp_char_form, master_znode); + strcat(temp_char_form, "/" ); sd_debug("start to compete master for the first time"); do { if (uatomic_is_true(&stop)) goto out_unlock; /* duplicate sequential node has no side-effect */ - rc = zk_create_seq_node(MASTER_ZNODE "/", + rc = zk_create_seq_node(temp_char_form, node_to_str(&this_node.node), MAX_NODE_STR_LEN, my_compete_path, MAX_NODE_STR_LEN, true); } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); - CHECK_ZK_RC(rc, MASTER_ZNODE "/"); + CHECK_ZK_RC(rc, temp_char_form); if (rc != ZOK) goto out_unlock; - + + strcpy(temp_char_form, master_znode); + strcat(temp_char_form, "/%"PRId32 ); sd_debug("my compete path: %s", my_compete_path); - sscanf(my_compete_path, MASTER_ZNODE "/%"PRId32, + sscanf(my_compete_path, temp_char_form, &my_seq); } @@ -964,10 +1000,12 @@ static int zk_join(const struct sd_node *myself, { int rc; char path[MAX_NODE_STR_LEN]; + char temp_char_form[MAX_NODE_STR_LEN] = ""; this_node.node = *myself; - - snprintf(path, sizeof(path), MEMBER_ZNODE "/%s", node_to_str(myself)); + strcpy(temp_char_form, member_znode ); + strcat(temp_char_form, "/%s"); + snprintf(path, sizeof(path), temp_char_form, node_to_str(myself)); rc = zk_node_exists(path); if (rc == ZOK) { sd_err("Previous zookeeper session exist, shoot myself. Please " @@ -985,17 +1023,21 @@ static int zk_join(const struct sd_node *myself, static int zk_leave(void) { char path[PATH_MAX]; + char temp_char_form[MAX_NODE_STR_LEN] = ""; sd_info("leaving from cluster"); uatomic_set_true(&stop); if (uatomic_is_true(&is_master)) { - snprintf(path, sizeof(path), MASTER_ZNODE "/%010"PRId32, - my_master_seq); + strcpy(temp_char_form, master_znode); + strcat(temp_char_form, "/%010"PRId32 ); + snprintf(path, sizeof(path), temp_char_form, my_master_seq); zk_delete_node(path, -1); } - snprintf(path, sizeof(path), MEMBER_ZNODE"/%s", + strcpy(temp_char_form, member_znode); + strcat(temp_char_form, "/%s" ); + snprintf(path, sizeof(path), temp_char_form, node_to_str(&this_node.node)); add_event(EVENT_LEAVE, &this_node, NULL, 0); lock_table_remove_znodes(); @@ -1038,9 +1080,9 @@ static void watch_all_nodes(void) struct String_vector strs; char path[MAX_NODE_STR_LEN]; - RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, &strs), ""); + RETURN_VOID_IF_ERROR(zk_get_children(member_znode, &strs), ""); - FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) { + FOR_EACH_ZNODE(member_znode, path, &strs) { RETURN_VOID_IF_ERROR(zk_node_exists(path), ""); } } @@ -1066,6 +1108,7 @@ static void zk_handle_accept(struct zk_event *ev) { char path[MAX_NODE_STR_LEN]; int rc; + char temp_char_form[MAX_NODE_STR_LEN] = ""; sd_debug("ACCEPT"); if (node_eq(&ev->sender.node, &this_node.node)) @@ -1074,7 +1117,9 @@ static void zk_handle_accept(struct zk_event *ev) sd_debug("%s", node_to_str(&ev->sender.node)); - snprintf(path, sizeof(path), MEMBER_ZNODE"/%s", + strcpy(temp_char_form,member_znode); + strcat(temp_char_form, "/%s"); + snprintf(path, sizeof(path), temp_char_form, node_to_str(&ev->sender.node)); if (node_eq(&ev->sender.node, &this_node.node)) { joined = true; @@ -1286,18 +1331,22 @@ static void zk_lock(uint64_t lock_id) char lowest_seq_path[MAX_NODE_STR_LEN]; char owner_name[MAX_NODE_STR_LEN]; struct cluster_lock *cluster_lock; + char temp_char_form[MAX_NODE_STR_LEN] = ""; cluster_lock = lock_table_lookup_acquire(lock_id); my_path = cluster_lock->lock_path; - snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64"/", + strcpy(temp_char_form, lock_znode ); + strcat(temp_char_form, "/%"PRIu64"/" ); + snprintf(parent, MAX_NODE_STR_LEN, temp_char_form, cluster_lock->id); /* * It need using path without end of '/' to create node of lock_id in * zookeeper's API, so we use 'parent_node'. */ - snprintf(parent_node, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64, + temp_char_form[strlen(temp_char_form)-1] = '\0'; + snprintf(parent_node, MAX_NODE_STR_LEN, temp_char_form, cluster_lock->id); create_seq_node: /* compete owner of lock is just like zk_compete_master() */ @@ -1359,7 +1408,8 @@ static void zk_unlock(uint64_t lock_id) static int zk_init(const char *option) { - char *hosts, *to, *p; + char hosts[MAX_NODE_STR_LEN]; + const char *pt, *pd; int ret, interval, retry = 0, max_retry; if (!option) { @@ -1367,17 +1417,28 @@ static int zk_init(const char *option) return -1; } - hosts = strtok((char *)option, "="); - if ((to = strtok(NULL, "="))) { - if (sscanf(to, "%u", &zk_timeout) != 1) { - sd_err("Invalid parameter for timeout"); - return -1; - } - p = strstr(hosts, "timeout"); - *--p = '\0'; + pt = strstr(option,"timeout="); + pd = strstr(option,"domain="); + if( pt==NULL && pd==NULL ){ + strcpy(hosts,option); + }else if( pt ){ + int i = 0; + while(option != pt) + hosts[i++] = *option++; + hosts[i-1] = '\0'; + sscanf(pt,"timeout=%d",&zk_timeout); + if( pd ) + sscanf(pd,"domain=%s",sd_domain); + }else{ + int i = 0; + while(option != pd) + hosts[i++] = *option++; + hosts[i-1] = '\0'; + sscanf(pd,"domain=%s",sd_domain); } - sd_debug("version %d.%d.%d, address %s, timeout %d", ZOO_MAJOR_VERSION, - ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout); + + sd_debug("version %d.%d.%d, address %s, timeout %d, sheepdog domain %s", ZOO_MAJOR_VERSION, + ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout, sd_domain); zhandle = zookeeper_init(hosts, zk_watcher, zk_timeout, NULL, NULL, 0); if (!zhandle) { sd_err("failed to initialize zk server %s", option); @@ -1398,6 +1459,18 @@ static int zk_init(const char *option) uatomic_set_false(&stop); uatomic_set_false(&is_master); + strcpy(master_znode,base_znode); + strcat(master_znode,"/"); + strcat(master_znode,sd_domain); + strcat(master_znode,master_znode_post); + strcpy(queue_znode,base_znode); + strcat(queue_znode,"/"); + strcat(queue_znode,sd_domain); + strcat(queue_znode,queue_znode_post); + strcpy(member_znode,base_znode); + strcat(member_znode,"/"); + strcat(member_znode,sd_domain); + strcat(member_znode,member_znode_post); if (zk_queue_init() != ZOK) return -1; @@ -1421,9 +1494,13 @@ static int zk_init(const char *option) sd_init_mutex(table_locks + i); } - ret = zk_init_node(LOCK_ZNODE); + strcpy(lock_znode,base_znode); + strcat(lock_znode,"/"); + strcat(lock_znode,sd_domain); + strcat(lock_znode,lock_znode_post); + ret = zk_init_node(lock_znode); if (ret != ZOK) { - sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret)); + sd_err("Failed to create %s %s", lock_znode, zerror(ret)); free(cluster_locks_table); return -1; } diff --git a/sheep/sheep.c b/sheep/sheep.c index 7d5fa0f..a5ba208 100644 --- a/sheep/sheep.c +++ b/sheep/sheep.c @@ -68,12 +68,14 @@ static const char cluster_help[] = "\tlocal: use local driver\n" "\tcorosync: use corosync driver\n" "\tzookeeper: use zookeeper driver, need extra arguments\n" -"\n\tzookeeper arguments: address-list,timeout=value (default as 3000)\n" +"\n\tzookeeper arguments: address-list,timeout=value(default as 3000),domain=value(default as sd_domain_default)\n" "\nExample:\n\t" -"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000 ...\n" +"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000,domain=sheep_domain ...\n" "This tries to use 3 node zookeeper cluster, which can be reached by\n" "IP1:PORT1, IP2:PORT2, IP3:PORT3 to manage membership and broadcast message\n" -"and set the timeout of node heartbeat as 1000 milliseconds\n"; +"and set the timeout of node heartbeat as 1000 milliseconds\n" +"and join the domain sheep_domain.\n" +"Notice that timeout should be followed by domain if both are given explicitly.\n"; static const char cache_help[] = "Available arguments:\n" -- 1.7.9.5 -- sheepdog mailing list sheepdog@lists.wpkg.org https://lists.wpkg.org/mailman/listinfo/sheepdog