Signed-off-by: Yu Yang <yuy...@cmss.chinamobile.com>
---
 sheep/cluster/zookeeper.c |  197 +++++++++++++++++++++++++++++++--------------
 sheep/sheep.c             |    8 +-
 2 files changed, 142 insertions(+), 63 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 3248af2..87d28bd 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -26,15 +26,22 @@
 #include "rbtree.h"
 
 #define SESSION_TIMEOUT 30000          /* millisecond */
+#define SD_DEFAULT_DOMAIN "sd_domain_default"
+
+static char base_znode[] = "/sheepdog";
+static char queue_znode[MAX_NODE_STR_LEN] = "";
+static char queue_znode_post[] = "/queue";
+static char member_znode[MAX_NODE_STR_LEN] = "";
+static char member_znode_post[] = "/member";
+static char master_znode[MAX_NODE_STR_LEN] = "";
+static char master_znode_post[] = "/master";
+static char lock_znode[MAX_NODE_STR_LEN] = "";
+static char lock_znode_post[] = "/lock";
 
-#define BASE_ZNODE "/sheepdog"
-#define QUEUE_ZNODE BASE_ZNODE "/queue"
-#define MEMBER_ZNODE BASE_ZNODE "/member"
-#define MASTER_ZNODE BASE_ZNODE "/master"
-#define LOCK_ZNODE BASE_ZNODE "/lock"
 
 static int zk_timeout = SESSION_TIMEOUT;
 static int my_master_seq;
+static char sd_domain[MAX_NODE_STR_LEN] = SD_DEFAULT_DOMAIN;
 
 /* structure for distributed lock */
 struct cluster_lock {
@@ -347,7 +354,7 @@ static struct cluster_lock 
*lock_table_lookup_acquire(uint64_t lock_id)
                ret_lock = xzalloc(sizeof(*ret_lock));
                ret_lock->id = lock_id;
                ret_lock->ref = 1;
-               snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+               snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,lock_znode,
                         ret_lock->id);
                rc = zk_init_node(path);
                if (rc)
@@ -399,8 +406,8 @@ static void lock_table_lookup_release(uint64_t lock_id)
                        /* free all resource used by this lock */
                        sd_destroy_mutex(&lock->id_lock);
                        sem_destroy(&lock->wait_wakeup);
-                       snprintf(path, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
-                                lock->id);
+                       snprintf(path, MAX_NODE_STR_LEN, "%s/%"PRIu64,
+                                lock_znode, lock->id);
                        /*
                         * If deletion of directory 'lock_id' fail, we only get
                         * a * empty directory in zookeeper. That's unharmful
@@ -458,7 +465,7 @@ static int zk_queue_peek(bool *peek)
        int rc;
        char path[MAX_NODE_STR_LEN];
 
-       snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+       snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
 
        rc = zk_node_exists(path);
        switch (rc) {
@@ -483,7 +490,7 @@ static int zk_find_seq_node(uint64_t id, char *seq_path, 
int seq_path_len,
        for (int seq = queue_pos; ; seq++) {
                struct zk_event ev;
 
-               snprintf(seq_path, seq_path_len, QUEUE_ZNODE"/%010"PRId32, seq);
+               snprintf(seq_path, seq_path_len, "%s/%010"PRId32, queue_znode, 
seq);
                len = offsetof(typeof(ev), id) + sizeof(ev.id);
                rc = zk_get_data(seq_path, &ev, &len);
                switch (rc) {
@@ -513,7 +520,7 @@ static int zk_queue_push(struct zk_event *ev)
        bool found;
 
        len = offsetof(typeof(*ev), buf) + ev->buf_len;
-       snprintf(path, sizeof(path), "%s/", QUEUE_ZNODE);
+       snprintf(path, sizeof(path), "%s/", queue_znode);
 again:
        rc = zk_create_seq_node(path, (char *)ev, len, buf, sizeof(buf), false);
        switch (rc) {
@@ -536,8 +543,11 @@ again:
        }
        if (first_push) {
                int32_t seq;
-
-               sscanf(buf, QUEUE_ZNODE "/%"PRId32, &seq);
+               
+               char temp_char_form[MAX_NODE_STR_LEN] = "";
+               strcpy(temp_char_form,queue_znode);
+               strcat(temp_char_form,"/%"PRId32);
+               sscanf(buf, temp_char_form, &seq);
                queue_pos = seq;
                eventfd_xwrite(efd, 1);
                first_push = false;
@@ -568,7 +578,7 @@ static int push_join_response(struct zk_event *ev)
        queue_pos--;
 
        len = offsetof(typeof(*ev), buf) + ev->buf_len;
-       snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+       snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
 
        RETURN_IF_ERROR(zk_set_data(path, (char *)ev, len, -1), "");
        sd_debug("update path:%s, queue_pos:%010" PRId32 ", len:%d", path,
@@ -582,7 +592,7 @@ static int zk_queue_pop_advance(struct zk_event *ev)
        char path[MAX_NODE_STR_LEN];
 
        len = sizeof(*ev);
-       snprintf(path, sizeof(path), QUEUE_ZNODE "/%010"PRId32, queue_pos);
+       snprintf(path, sizeof(path), "%s/%010"PRId32, queue_znode, queue_pos);
 
        RETURN_IF_ERROR(zk_get_data(path, ev, &len), "path %s", path);
        sd_debug("%s, type:%d, len:%d, pos:%" PRId32, path, ev->type, len,
@@ -641,10 +651,15 @@ static inline void build_node_list(void)
 
 static int zk_queue_init(void)
 {
-       RETURN_IF_ERROR(zk_init_node(BASE_ZNODE), "path %s", BASE_ZNODE);
-       RETURN_IF_ERROR(zk_init_node(MASTER_ZNODE), "path %s", MASTER_ZNODE);
-       RETURN_IF_ERROR(zk_init_node(QUEUE_ZNODE), "path %s", QUEUE_ZNODE);
-       RETURN_IF_ERROR(zk_init_node(MEMBER_ZNODE), "path %s", MEMBER_ZNODE);
+       char sd_domain_znode[128] = "";
+       strcpy(sd_domain_znode, base_znode);
+       strcat(sd_domain_znode, "/");
+       strcat(sd_domain_znode, sd_domain);
+       RETURN_IF_ERROR(zk_init_node(base_znode), "path %s", base_znode);
+       RETURN_IF_ERROR(zk_init_node(sd_domain_znode), "path %s", 
sd_domain_znode);
+       RETURN_IF_ERROR(zk_init_node(master_znode), "path %s", master_znode);
+       RETURN_IF_ERROR(zk_init_node(queue_znode), "path %s", queue_znode);
+       RETURN_IF_ERROR(zk_init_node(member_znode), "path %s", member_znode);
        return ZOK;
 }
 
@@ -692,6 +707,7 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
        char str[MAX_NODE_STR_LEN], *p;
        uint64_t lock_id;
        int ret;
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        if (type == ZOO_SESSION_EVENT && state == ZOO_EXPIRED_SESSION_STATE) {
                /*
@@ -705,7 +721,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
 /* CREATED_EVENT 1, DELETED_EVENT 2, CHANGED_EVENT 3, CHILD_EVENT 4 */
        sd_debug("path:%s, type:%d, state:%d", path, type, state);
        if (type == ZOO_CREATED_EVENT || type == ZOO_CHANGED_EVENT) {
-               ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+               strcpy(temp_char_form,member_znode);
+               strcat(temp_char_form,"/%s");
+               ret = sscanf(path, temp_char_form, str);
                if (ret == 1)
                        zk_node_exists(path);
                /* kick off the event handler */
@@ -714,7 +732,9 @@ static void zk_watcher(zhandle_t *zh, int type, int state, 
const char *path,
                struct zk_node *n;
 
                /* process distributed lock */
-               ret = sscanf(path, LOCK_ZNODE "/%"PRIu64"/%s", &lock_id, str);
+               strcpy(temp_char_form,lock_znode);
+               strcat(temp_char_form,"/%"PRIu64"/%s");
+               ret = sscanf(path, temp_char_form, &lock_id, str);
                if (ret == 2) {
                        ret = lock_table_lookup_wakeup(lock_id);
                        if (ret)
@@ -722,14 +742,18 @@ static void zk_watcher(zhandle_t *zh, int type, int 
state, const char *path,
                                         lock_id, str);
                        return;
                }
-
-               ret = sscanf(path, MASTER_ZNODE "/%s", str);
+               
+               strcpy(temp_char_form,master_znode);
+               strcat(temp_char_form,"/%s");
+               ret = sscanf(path, temp_char_form, str);
                if (ret == 1) {
                        zk_compete_master();
                        return;
                }
 
-               ret = sscanf(path, MEMBER_ZNODE "/%s", str);
+               strcpy(temp_char_form,member_znode);
+               strcat(temp_char_form,"/%s");
+               ret = sscanf(path, temp_char_form, str);
                if (ret != 1)
                        return;
                p = strrchr(path, '/');
@@ -815,19 +839,23 @@ static int zk_find_master(int *master_seq, char 
*master_name)
 {
        int rc, len = MAX_NODE_STR_LEN;
        char master_compete_path[MAX_NODE_STR_LEN];
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        if (*master_seq < 0) {
-               RETURN_IF_ERROR(zk_get_least_seq(MASTER_ZNODE,
+               RETURN_IF_ERROR(zk_get_least_seq(master_znode,
                                                 master_compete_path,
                                                 MAX_NODE_STR_LEN, master_name,
                                                 &len), "");
-               sscanf(master_compete_path, MASTER_ZNODE "/%"PRId32,
-                      master_seq);
+               strcpy(temp_char_form,master_znode);
+               strcat(temp_char_form,"/%"PRId32);
+               sscanf(master_compete_path, temp_char_form, master_seq);
                return ZOK;
        } else {
                while (true) {
+                       strcpy(temp_char_form,master_znode);
+                       strcat(temp_char_form,"/%010"PRId32);
                        snprintf(master_compete_path, len,
-                                MASTER_ZNODE "/%010"PRId32, *master_seq);
+                                temp_char_form, *master_seq);
                        rc = zk_get_data(master_compete_path, master_name,
                                         &len);
                        switch (rc) {
@@ -854,10 +882,12 @@ static int zk_verify_last_sheep_join(int seq, int 
*last_sheep)
 {
        int rc, len = MAX_NODE_STR_LEN;
        char path[MAX_NODE_STR_LEN], name[MAX_NODE_STR_LEN];
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        for (*last_sheep = seq - 1; *last_sheep >= 0; (*last_sheep)--) {
-               snprintf(path, MAX_NODE_STR_LEN, MASTER_ZNODE "/%010"PRId32,
-                        *last_sheep);
+               strcpy(temp_char_form, master_znode);
+               strcat(temp_char_form, "/%010"PRId32 );
+               snprintf(path, MAX_NODE_STR_LEN, temp_char_form, *last_sheep);
                rc = zk_get_data(path, name, &len);
                switch (rc) {
                case ZNONODE:
@@ -871,8 +901,9 @@ static int zk_verify_last_sheep_join(int seq, int 
*last_sheep)
 
                if (!strcmp(name, node_to_str(&this_node.node)))
                        continue;
-
-               snprintf(path, MAX_NODE_STR_LEN, MEMBER_ZNODE "/%s", name);
+               strcpy(temp_char_form, member_znode);
+               strcat(temp_char_form, "/%s");
+               snprintf(path, MAX_NODE_STR_LEN, temp_char_form, name);
                rc = zk_node_exists(path);
                switch (rc) {
                case ZOK:
@@ -898,7 +929,7 @@ static void zk_compete_master(void)
        char master_name[MAX_NODE_STR_LEN];
        char my_compete_path[MAX_NODE_STR_LEN];
        static int master_seq = -1, my_seq;
-
+       
        /*
         * This is to protect master_seq and my_seq because this function will
         * be called by both main thread and zookeeper's event thread.
@@ -909,23 +940,28 @@ static void zk_compete_master(void)
                goto out_unlock;
 
        if (!joined) {
+               char temp_char_form[MAX_NODE_STR_LEN] = "";
+               strcpy(temp_char_form, master_znode);
+               strcat(temp_char_form, "/" );
                sd_debug("start to compete master for the first time");
                do {
                        if (uatomic_is_true(&stop))
                                goto out_unlock;
                        /* duplicate sequential node has no side-effect */
-                       rc = zk_create_seq_node(MASTER_ZNODE "/",
+                       rc = zk_create_seq_node(temp_char_form,
                                                node_to_str(&this_node.node),
                                                MAX_NODE_STR_LEN,
                                                my_compete_path,
                                                MAX_NODE_STR_LEN, true);
                } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS);
-               CHECK_ZK_RC(rc, MASTER_ZNODE "/");
+               CHECK_ZK_RC(rc, temp_char_form);
                if (rc != ZOK)
                        goto out_unlock;
-
+               
+               strcpy(temp_char_form, master_znode);
+               strcat(temp_char_form, "/%"PRId32 );
                sd_debug("my compete path: %s", my_compete_path);
-               sscanf(my_compete_path, MASTER_ZNODE "/%"PRId32,
+               sscanf(my_compete_path, temp_char_form,
                       &my_seq);
        }
 
@@ -964,10 +1000,12 @@ static int zk_join(const struct sd_node *myself,
 {
        int rc;
        char path[MAX_NODE_STR_LEN];
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        this_node.node = *myself;
-
-       snprintf(path, sizeof(path), MEMBER_ZNODE "/%s", node_to_str(myself));
+       strcpy(temp_char_form, member_znode );
+       strcat(temp_char_form, "/%s");
+       snprintf(path, sizeof(path), temp_char_form, node_to_str(myself));
        rc = zk_node_exists(path);
        if (rc == ZOK) {
                sd_err("Previous zookeeper session exist, shoot myself. Please "
@@ -985,17 +1023,21 @@ static int zk_join(const struct sd_node *myself,
 static int zk_leave(void)
 {
        char path[PATH_MAX];
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        sd_info("leaving from cluster");
        uatomic_set_true(&stop);
 
        if (uatomic_is_true(&is_master)) {
-               snprintf(path, sizeof(path), MASTER_ZNODE "/%010"PRId32,
-                               my_master_seq);
+               strcpy(temp_char_form, master_znode);
+               strcat(temp_char_form, "/%010"PRId32 );
+               snprintf(path, sizeof(path), temp_char_form, my_master_seq);
                zk_delete_node(path, -1);
        }
 
-       snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+       strcpy(temp_char_form, member_znode);
+       strcat(temp_char_form, "/%s" );
+       snprintf(path, sizeof(path), temp_char_form,
                 node_to_str(&this_node.node));
        add_event(EVENT_LEAVE, &this_node, NULL, 0);
        lock_table_remove_znodes();
@@ -1038,9 +1080,9 @@ static void watch_all_nodes(void)
        struct String_vector strs;
        char path[MAX_NODE_STR_LEN];
 
-       RETURN_VOID_IF_ERROR(zk_get_children(MEMBER_ZNODE, &strs), "");
+       RETURN_VOID_IF_ERROR(zk_get_children(member_znode, &strs), "");
 
-       FOR_EACH_ZNODE(MEMBER_ZNODE, path, &strs) {
+       FOR_EACH_ZNODE(member_znode, path, &strs) {
                RETURN_VOID_IF_ERROR(zk_node_exists(path), "");
        }
 }
@@ -1066,6 +1108,7 @@ static void zk_handle_accept(struct zk_event *ev)
 {
        char path[MAX_NODE_STR_LEN];
        int rc;
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        sd_debug("ACCEPT");
        if (node_eq(&ev->sender.node, &this_node.node))
@@ -1074,7 +1117,9 @@ static void zk_handle_accept(struct zk_event *ev)
 
        sd_debug("%s", node_to_str(&ev->sender.node));
 
-       snprintf(path, sizeof(path), MEMBER_ZNODE"/%s",
+       strcpy(temp_char_form,member_znode);
+       strcat(temp_char_form, "/%s");
+       snprintf(path, sizeof(path), temp_char_form,
                 node_to_str(&ev->sender.node));
        if (node_eq(&ev->sender.node, &this_node.node)) {
                joined = true;
@@ -1286,18 +1331,22 @@ static void zk_lock(uint64_t lock_id)
        char lowest_seq_path[MAX_NODE_STR_LEN];
        char owner_name[MAX_NODE_STR_LEN];
        struct cluster_lock *cluster_lock;
+       char temp_char_form[MAX_NODE_STR_LEN] = "";
 
        cluster_lock = lock_table_lookup_acquire(lock_id);
 
        my_path = cluster_lock->lock_path;
 
-       snprintf(parent, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64"/",
+       strcpy(temp_char_form, lock_znode );
+       strcat(temp_char_form, "/%"PRIu64"/" );
+       snprintf(parent, MAX_NODE_STR_LEN, temp_char_form,
                 cluster_lock->id);
        /*
         * It need using path without end of '/' to create node of lock_id in
         * zookeeper's API, so we use 'parent_node'.
         */
-       snprintf(parent_node, MAX_NODE_STR_LEN, LOCK_ZNODE "/%"PRIu64,
+       temp_char_form[strlen(temp_char_form)-1] = '\0';
+       snprintf(parent_node, MAX_NODE_STR_LEN, temp_char_form,
                 cluster_lock->id);
 create_seq_node:
        /* compete owner of lock is just like zk_compete_master() */
@@ -1359,7 +1408,8 @@ static void zk_unlock(uint64_t lock_id)
 
 static int zk_init(const char *option)
 {
-       char *hosts, *to, *p;
+       char hosts[MAX_NODE_STR_LEN];
+       const char *pt, *pd;
        int ret, interval, retry = 0, max_retry;
 
        if (!option) {
@@ -1367,17 +1417,28 @@ static int zk_init(const char *option)
                return -1;
        }
 
-       hosts = strtok((char *)option, "=");
-       if ((to = strtok(NULL, "="))) {
-               if (sscanf(to, "%u", &zk_timeout) != 1) {
-                       sd_err("Invalid parameter for timeout");
-                       return -1;
-               }
-               p = strstr(hosts, "timeout");
-               *--p = '\0';
+       pt = strstr(option,"timeout=");
+       pd = strstr(option,"domain=");
+       if( pt==NULL && pd==NULL ){
+               strcpy(hosts,option);
+       }else if( pt ){
+               int i = 0;
+               while(option != pt)
+                       hosts[i++] = *option++;
+               hosts[i-1] = '\0';
+               sscanf(pt,"timeout=%d",&zk_timeout);
+               if( pd )
+                       sscanf(pd,"domain=%s",sd_domain);
+       }else{
+               int i = 0;
+               while(option != pd)
+                       hosts[i++] = *option++;
+               hosts[i-1] = '\0';
+               sscanf(pd,"domain=%s",sd_domain);
        }
-       sd_debug("version %d.%d.%d, address %s, timeout %d", ZOO_MAJOR_VERSION,
-                ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout);
+       
+       sd_debug("version %d.%d.%d, address %s, timeout %d, sheepdog domain 
%s", ZOO_MAJOR_VERSION,
+                ZOO_MINOR_VERSION, ZOO_PATCH_VERSION, hosts, zk_timeout, 
sd_domain);
        zhandle = zookeeper_init(hosts, zk_watcher, zk_timeout, NULL, NULL, 0);
        if (!zhandle) {
                sd_err("failed to initialize zk server %s", option);
@@ -1398,6 +1459,18 @@ static int zk_init(const char *option)
 
        uatomic_set_false(&stop);
        uatomic_set_false(&is_master);
+        strcpy(master_znode,base_znode);
+        strcat(master_znode,"/");
+        strcat(master_znode,sd_domain);
+        strcat(master_znode,master_znode_post);
+        strcpy(queue_znode,base_znode);
+        strcat(queue_znode,"/");
+        strcat(queue_znode,sd_domain);
+        strcat(queue_znode,queue_znode_post);
+        strcpy(member_znode,base_znode);
+        strcat(member_znode,"/");
+        strcat(member_znode,sd_domain);
+        strcat(member_znode,member_znode_post);
        if (zk_queue_init() != ZOK)
                return -1;
 
@@ -1421,9 +1494,13 @@ static int zk_init(const char *option)
                sd_init_mutex(table_locks + i);
        }
 
-       ret = zk_init_node(LOCK_ZNODE);
+        strcpy(lock_znode,base_znode);
+        strcat(lock_znode,"/");
+        strcat(lock_znode,sd_domain);
+        strcat(lock_znode,lock_znode_post);
+       ret = zk_init_node(lock_znode);
        if (ret != ZOK) {
-               sd_err("Failed to create %s %s", LOCK_ZNODE, zerror(ret));
+               sd_err("Failed to create %s %s", lock_znode, zerror(ret));
                free(cluster_locks_table);
                return -1;
        }
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 7d5fa0f..a5ba208 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -68,12 +68,14 @@ static const char cluster_help[] =
 "\tlocal: use local driver\n"
 "\tcorosync: use corosync driver\n"
 "\tzookeeper: use zookeeper driver, need extra arguments\n"
-"\n\tzookeeper arguments: address-list,timeout=value (default as 3000)\n"
+"\n\tzookeeper arguments: address-list,timeout=value(default as 
3000),domain=value(default as sd_domain_default)\n"
 "\nExample:\n\t"
-"$ sheep -c zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000 ...\n"
+"$ sheep -c 
zookeeper:IP1:PORT1,IP2:PORT2,IP3:PORT3,timeout=1000,domain=sheep_domain ...\n"
 "This tries to use 3 node zookeeper cluster, which can be reached by\n"
 "IP1:PORT1, IP2:PORT2, IP3:PORT3 to manage membership and broadcast message\n"
-"and set the timeout of node heartbeat as 1000 milliseconds\n";
+"and set the timeout of node heartbeat as 1000 milliseconds\n"
+"and join the domain sheep_domain.\n"
+"Notice that timeout should be followed by domain if both are given 
explicitly.\n";
 
 static const char cache_help[] =
 "Available arguments:\n"
-- 
1.7.9.5



-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
https://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to