Zookeeper occupies more and more memory in our production environment because the nodes in /sheepdog/queue are never deleted even after the messages were already handled.
So, we need a tool to list and remove them periodically. Signed-off-by: Ruoyu <lian...@ucweb.com> --- tools/zk_control.c | 239 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 239 insertions(+) diff --git a/tools/zk_control.c b/tools/zk_control.c index 6b3b136..94c4277 100644 --- a/tools/zk_control.c +++ b/tools/zk_control.c @@ -13,6 +13,11 @@ #include <zookeeper/zookeeper.h> #include <string.h> +#include <arpa/inet.h> + +#include "list.h" +#include "rbtree.h" +#include "internal_proto.h" #define FOR_EACH_ZNODE(parent, path, strs) \ for ((strs)->data += (strs)->count; \ @@ -21,9 +26,98 @@ *--(strs)->data) : (free((strs)->data), 0); \ free(*(strs)->data)) +enum zk_event_type { + EVENT_JOIN = 1, + EVENT_ACCEPT, + EVENT_LEAVE, + EVENT_BLOCK, + EVENT_UNBLOCK, + EVENT_NOTIFY, + EVENT_UPDATE_NODE, +}; + +struct zk_node { + struct list_node list; + struct rb_node rb; + struct sd_node node; + bool callbacked; + bool gone; +}; + +#define ZK_MAX_BUF_SIZE (1*1024*1024) /* 1M */ + +struct zk_event { + uint64_t id; + enum zk_event_type type; + struct zk_node sender; + size_t msg_len; + size_t nr_nodes; + size_t buf_len; + uint8_t buf[ZK_MAX_BUF_SIZE]; +}; + static const char *hosts = "127.0.0.1:2181"; static zhandle_t *zk_handle; +static const char *evtype_to_str(int type) +{ + switch (type) { + case EVENT_JOIN: + return "JOIN"; + break; + case EVENT_ACCEPT: + return "ACCEPT"; + break; + case EVENT_LEAVE: + return "LEAVE"; + break; + case EVENT_BLOCK: + return "BLOCK"; + break; + case EVENT_UNBLOCK: + return "UNBLOCK"; + break; + case EVENT_NOTIFY: + return "NOTIFY"; + break; + case EVENT_UPDATE_NODE: + return "UPDATE_NODE"; + break; + default: + return "UNKNOWN"; + break; + } +} + +static const char *addr_to_str(const uint8_t *addr, uint16_t port) +{ + static __thread char str[HOST_NAME_MAX + 8]; + int af = AF_INET6; + int addr_start_idx = 0; + const char *ret; + + /* Find address family type */ + if (addr[12]) { + int oct_no = 0; + while (!addr[oct_no] && oct_no++ < 12) + ; + if (oct_no == 12) { + af = AF_INET; + addr_start_idx = 12; + } + } + ret = inet_ntop(af, addr + addr_start_idx, str, sizeof(str)); + if (unlikely(ret == NULL)) + fprintf(stderr, "failed to convert addr to string, %m\n"); + + if (port) { + int len = strlen(str); + snprintf(str + len, sizeof(str) - len, ":%d", port); + } + + return str; +} + static inline ZOOAPI int zk_delete_node(const char *path) { int rc; @@ -45,6 +139,18 @@ static inline ZOOAPI int zk_get_children(const char *path, return rc; } +static inline ZOOAPI int zk_get_data(const char *path, void *buffer, + int *buffer_len, struct Stat *stat) +{ + int rc; + do { + rc = zoo_get(zk_handle, path, 1, (char *)buffer, + buffer_len, stat); + } while (rc == ZOPERATIONTIMEOUT || rc == ZCONNECTIONLOSS); + + return rc; +} + static int do_kill(int argc, char **argv) { char *path; @@ -130,6 +236,137 @@ err: return -1; } +static int do_list_queue(int argc, char **argv) +{ +#define QUEUE_ZONE "/sheepdog/queue" + struct String_vector strs; + int rc, len, listed = 0; + char path[256], str_ctime[128], str_mtime[128]; + time_t t1, t2; + struct tm tm_ctime, tm_mtime; + struct zk_event ev; + struct Stat stat; + int32_t seq; + struct node_id *nid; + + fprintf(stdout, " QUEUE ID TYPE" + " SENDER MSG LEN NR BUF LEN" + " CREATE TIME MODIFY TIME\n"); + rc = zk_get_children(QUEUE_ZONE, &strs); + switch (rc) { + case ZOK: + FOR_EACH_ZNODE(QUEUE_ZONE, path, &strs) { + len = sizeof(struct zk_event); + rc = zk_get_data(path, &ev, &len, &stat); + if (rc != ZOK) { + fprintf(stderr, "failed to get data " + "%s, %s\n", + path, zerror(rc)); + goto err; + } + + t1 = stat.ctime / 1000; + localtime_r(&t1, &tm_ctime); + strftime(str_ctime, sizeof(str_ctime), + "%Y-%m-%d %H:%M:%S", &tm_ctime); + + t2 = stat.mtime / 1000; + localtime_r(&t2, &tm_mtime); + strftime(str_mtime, sizeof(str_mtime), + "%Y-%m-%d %H:%M:%S", &tm_mtime); + + sscanf(path, QUEUE_ZONE "/%"PRId32, &seq); + nid = &ev.sender.node.nid; + fprintf(stdout, "%010"PRId32" %016"PRIx64 + " %12s %21s %7zd %4zd %7zd %s %s\n", + seq, ev.id, evtype_to_str(ev.type), + addr_to_str(nid->addr, nid->port), + ev.msg_len, ev.nr_nodes, ev.buf_len, + str_ctime, str_mtime); + listed++; + } + break; + default: + goto err; + } + + fprintf(stdout, "\ntotal nodes: %d\n", listed); + return 0; +err: + fprintf(stderr, "failed to list %s, %s\n", QUEUE_ZONE, zerror(rc)); + return -1; +} + +static int do_remove_queue(int argc, char **argv) +{ +#define MIN_THRESHOLD 86400 + struct String_vector strs; + int rc, len, threshold, deleted = 0; + const char *node = "/sheepdog/queue"; + char *p, path[256]; + struct zk_event ev; + struct Stat stat; + struct timeval tv; + + if (argc != 3) { + fprintf(stderr, "remove queue: need specify " + "threshold in seconds\n"); + return -1; + } + + threshold = strtol(argv[2], &p, 10); + if (p == argv[2]) { + fprintf(stderr, "threshold must be a number\n"); + return -1; + } + if (threshold < MIN_THRESHOLD) { + threshold = MIN_THRESHOLD; + fprintf(stdout, "threshold is less than %d seconds, " + "set it to %d\n", MIN_THRESHOLD, MIN_THRESHOLD); + } + + gettimeofday(&tv, NULL); + + rc = zk_get_children(node, &strs); + switch (rc) { + case ZOK: + FOR_EACH_ZNODE(node, path, &strs) { + len = sizeof(struct zk_event); + rc = zk_get_data(path, &ev, &len, &stat); + if (rc != ZOK) { + fprintf(stderr, "failed to get data " + "%s, %s\n", + path, zerror(rc)); + goto err; + } + if (stat.mtime / 1000 >= tv.tv_sec - threshold) + continue; + + rc = zk_delete_node(path); + if (rc != ZOK) { + fprintf(stderr, "failed to delete " + "%s, %s\n", + path, zerror(rc)); + goto err; + } + + deleted++; + if (deleted % 100 == 0) + fprintf(stdout, "%d queue nodes are deleted\n", + deleted); + } + break; + default: + goto err; + } + + fprintf(stdout, "completed. %d queue nodes are deleted\n", deleted); + return 0; +err: + fprintf(stderr, "failed to list %s, %s\n", node, zerror(rc)); + return -1; +} + static struct control_handler { const char *name; int (*execute)(int, char **); @@ -137,6 +374,8 @@ static struct control_handler { } handlers[] = { { "kill", do_kill, "Kill the session" }, { "remove", do_remove, "Remove the node recursively" }, + { "lqueue", do_list_queue, "List the data in queue node" }, + { "rqueue", do_remove_queue, "Remove the data in queue node" }, { NULL, NULL, NULL }, }; -- 1.8.3.2 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog