Use zookeeper distributed lock to prevent event lost.

Signed-off-by: Yunkai Zhang <[email protected]>
---
 sheep/cluster/zookeeper.c |   63 +++++++++++++++++++++++++++++++++++----------
 1 files changed, 49 insertions(+), 14 deletions(-)

diff --git a/sheep/cluster/zookeeper.c b/sheep/cluster/zookeeper.c
index 23ce4ab..c2592d2 100644
--- a/sheep/cluster/zookeeper.c
+++ b/sheep/cluster/zookeeper.c
@@ -72,8 +72,10 @@ static void zk_lock(zhandle_t *zh)
 again:
        rc = zoo_create(zh, LOCK_ZNODE, "", 0, &ZOO_OPEN_ACL_UNSAFE,
                        ZOO_EPHEMERAL, NULL, 0);
-       if (rc == ZOK)
+       if (rc == ZOK){
+               dprintf("locked\n");
                return;
+       }
        else if (rc == ZNODEEXISTS) {
                dprintf("retry\n");
                usleep(10000); /* FIXME: use watch notification */
@@ -89,11 +91,14 @@ static void zk_unlock(zhandle_t *zh)
        rc = zoo_delete(zh, LOCK_ZNODE, -1);
        if (rc != ZOK)
                panic("failed to release lock\n");
+
+       dprintf("unlocked\n");
 }
 
 
 /* ZooKeeper-based queue */
 
+static int efd;
 static int queue_pos;
 
 static int zk_queue_empty(zhandle_t *zh)
@@ -114,19 +119,27 @@ static void zk_queue_push(zhandle_t *zh, struct zk_event 
*ev)
 {
        int rc;
        char path[256], buf[256];
+       eventfd_t value = 1;
+
+       zk_lock(zh);
 
        sprintf(path, "%s/", QUEUE_ZNODE);
        rc = zoo_create(zh, path, (char *)ev, sizeof(*ev),
                        &ZOO_OPEN_ACL_UNSAFE, ZOO_SEQUENCE, buf, sizeof(buf));
+       assert(rc == ZOK);
 
+       dprintf("create path:%s, queue_pos:%d\n", buf, queue_pos);
        if (queue_pos < 0) {
                /* the first pushed data should be EVENT_JOIN */
                assert(ev->type == EVENT_JOIN);
                sscanf(buf, QUEUE_ZNODE "/%010d", &queue_pos);
 
-               /* watch */
-               zoo_exists(zh, buf, 1, NULL);
+               /* manual notify */
+               dprintf("write event to efd:%d\n", efd);
+               eventfd_write(efd, value);
        }
+
+       zk_unlock(zh);
 }
 
 static int zk_queue_push_back(zhandle_t *zh, struct zk_event *ev)
@@ -134,33 +147,57 @@ static int zk_queue_push_back(zhandle_t *zh, struct 
zk_event *ev)
        int rc;
        char path[256];
 
+       zk_lock(zh);
+
        queue_pos--;
 
        if (ev) {
                /* update the last popped data */
                sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
+               dprintf("update path:%s, queue_pos:%d\n", path, queue_pos);
                rc = zoo_set(zh, path, (char *)ev, sizeof(*ev), -1);
+               assert(rc == ZOK);
+
        }
 
+       zk_unlock(zh);
+
        return 0;
 }
 
 static int zk_queue_pop(zhandle_t *zh, struct zk_event *ev)
 {
-       int rc, len;
+       int rc, len, pos;
        char path[256];
+       eventfd_t value = 1;
 
        if (zk_queue_empty(zh))
                return -1;
 
+       zk_lock(zh);
+
        sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
        len = sizeof(*ev);
+       dprintf("read path:%s\n", path);
        rc = zoo_get(zh, path, 1, (char *)ev, &len, NULL);
+       assert(rc == ZOK);
 
-       /* watch next data */
-       queue_pos++;
-       sprintf(path, QUEUE_ZNODE "/%010d", queue_pos);
-       zoo_exists(zh, path, 1, NULL);
+       pos = queue_pos++;
+       do {
+               /* watch next data */
+               pos++;
+               sprintf(path, QUEUE_ZNODE "/%010d", pos);
+               dprintf("watch path:%s\n", path);
+
+               rc = zoo_exists(zh, path, 1, NULL);
+               if (rc == ZOK) {
+                       /* we lost this message, manual notify */
+                       dprintf("write event to efd:%d\n", efd);
+                       eventfd_write(efd, value);
+               }
+       } while (rc == ZOK);
+
+       zk_unlock(zh);
 
        return 0;
 }
@@ -219,7 +256,6 @@ static void zk_queue_init(zhandle_t *zh)
 /* ZooKeeper driver APIs */
 
 static zhandle_t *zhandle;
-static int efd;
 
 static struct work_queue *zk_block_wq;
 
@@ -262,8 +298,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
        struct sd_node *n;
        struct zk_event ev;
 
-       zk_lock(zh);
-
        ev.type = type;
        ev.sender = *node;
        ev.buf_len = buf_len;
@@ -272,6 +306,7 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
                memcpy(ev.buf, buf, buf_len);
 
        ev.nr_nodes = get_nodes(zh, ev.nodes);
+       dprintf("get_nodes, nr_nodes:%d\n", ev.nr_nodes);
 
        switch (type) {
        case EVENT_JOIN:
@@ -296,8 +331,6 @@ static int add_event(zhandle_t *zh, enum zk_event_type type,
 
        zk_queue_push(zh, &ev);
 out:
-       zk_unlock(zh);
-
        return 0;
 }
 
@@ -447,11 +480,13 @@ static int zk_notify(void *msg, size_t msg_len, void 
(*block_cb)(void *arg))
 
 static void zk_block(struct work *work)
 {
+       int rc;
        struct zk_event ev;
 
        pthread_mutex_lock(&queue_lock);
 
-       zk_queue_pop(zhandle, &ev);
+       rc = zk_queue_pop(zhandle, &ev);
+       assert(rc == 0);
 
        ev.block_cb(ev.buf);
        ev.blocked = 0;
-- 
1.7.7.6

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to