Signed-off-by: MORITA Kazutaka <[email protected]> --- sheep/Makefile.am | 2 +- sheep/cluster/local.c | 474 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 475 insertions(+), 1 deletions(-) create mode 100644 sheep/cluster/local.c
diff --git a/sheep/Makefile.am b/sheep/Makefile.am index 85652dd..745fdde 100644 --- a/sheep/Makefile.am +++ b/sheep/Makefile.am @@ -24,7 +24,7 @@ INCLUDES = -I$(top_builddir)/include -I$(top_srcdir)/include $(libcpg_CFLAGS) $ sbin_PROGRAMS = sheep sheep_SOURCES = sheep.c group.c sdnet.c store.c vdi.c work.c journal.c ops.c \ - cluster/corosync.c + cluster/corosync.c cluster/local.c sheep_LDADD = $(libcpg_LIBS) $(libcfg_LIBS) ../lib/libsheepdog.a -lpthread sheep_DEPENDENCIES = ../lib/libsheepdog.a diff --git a/sheep/cluster/local.c b/sheep/cluster/local.c new file mode 100644 index 0000000..890228d --- /dev/null +++ b/sheep/cluster/local.c @@ -0,0 +1,474 @@ +/* + * Copyright (C) 2011 Nippon Telegraph and Telephone Corporation. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License version + * 2 as published by the Free Software Foundation. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + */ +#include <stdio.h> +#include <string.h> +#include <unistd.h> +#include <sys/mman.h> +#include <sys/signalfd.h> +#include <sys/file.h> +#include <search.h> +#include <signal.h> +#include <fcntl.h> +#include <assert.h> + +#include "cluster.h" +#include "event.h" +#include "work.h" + +#define MAX_EVENTS 500 +#define MAX_EVENT_BUF_SIZE (64 * 1024) + +const char *shmfile = "/tmp/sheepdog_shm"; +static int shmfd; +static int sigfd; +static int event_pos; +static struct sheepdog_node_list_entry this_node; + +static struct work_queue *local_block_wq; + +static struct cdrv_handlers lhdlrs; +static enum cluster_join_result (*local_check_join_cb)( + struct sheepdog_node_list_entry *joining, void *opaque); + +enum local_event_type { + EVENT_JOIN = 1, + EVENT_LEAVE, + EVENT_NOTIFY, +}; + +struct local_event { + enum local_event_type type; + struct sheepdog_node_list_entry sender; + + size_t buf_len; + uint8_t buf[MAX_EVENT_BUF_SIZE]; + + size_t nr_nodes; /* the number of sheep processes */ + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; + pid_t pids[SD_MAX_NODES]; + + enum cluster_join_result join_result; + + void (*block_cb)(void *arg); + + int blocked; /* set non-zero when sheep must block this event */ + int callbacked; /* set non-zero if sheep already called block_cb() */ +}; + + +/* shared memory queue */ + +struct shm_queue { + uint64_t chksum; + + int pos; + struct local_event events[MAX_EVENTS]; +} *shm_queue; + +static void shm_queue_lock(void) +{ + flock(shmfd, LOCK_EX); +} + +static void shm_queue_unlock(void) +{ + flock(shmfd, LOCK_UN); +} + +static int shm_queue_empty(void) +{ + return event_pos == shm_queue->pos; +} + +static size_t get_nodes(struct sheepdog_node_list_entry *n, pid_t *p) +{ + struct local_event *ev; + + ev = shm_queue->events + shm_queue->pos; + + if (n) + memcpy(n, ev->nodes, sizeof(ev->nodes)); + if (p) + memcpy(p, ev->pids, sizeof(ev->pids)); + + return ev->nr_nodes; +} + +static int process_exists(pid_t pid) +{ + return kill(pid, 0) == 0; +} + +static struct local_event *shm_queue_peek(void) +{ + if (shm_queue_empty()) + return NULL; + + return shm_queue->events + (event_pos + 1) % MAX_EVENTS; +} + +static void shm_queue_push(struct local_event *ev) +{ + shm_queue->pos = (shm_queue->pos + 1) % MAX_EVENTS; + shm_queue->events[shm_queue->pos] = *ev; + + msync(shm_queue->events + shm_queue->pos, sizeof(*ev), MS_SYNC); + msync(&shm_queue->pos, sizeof(shm_queue->pos), MS_SYNC); +} + +static struct local_event *shm_queue_pop(void) +{ + if (shm_queue_empty()) + return NULL; + + event_pos = (event_pos + 1) % MAX_EVENTS; + + return shm_queue->events + event_pos; +} + +static uint64_t shm_queue_calc_chksum(void) +{ + return fnv_64a_buf(shm_queue->events + shm_queue->pos, + sizeof(*shm_queue->events), FNV1A_64_INIT); +} + +static void shm_queue_set_chksum(void) +{ + shm_queue->chksum = shm_queue_calc_chksum(); + msync(&shm_queue->chksum, sizeof(shm_queue->chksum), MS_SYNC); +} + +static void shm_queue_notify(void) +{ + int i; + size_t nr; + pid_t pids[SD_MAX_NODES]; + + shm_queue_set_chksum(); + + nr = get_nodes(NULL, pids); + + for (i = 0; i < nr; i++) + kill(pids[i], SIGUSR1); +} + +static int is_shm_queue_valid(void) +{ + int i; + size_t nr; + pid_t pids[SD_MAX_NODES]; + + if (shm_queue->chksum != shm_queue_calc_chksum()) { + dprintf("invalid shm queue\n"); + return 0; + } + + nr = get_nodes(NULL, pids); + + if (nr == 0) + return 1; + + for (i = 0; i < nr; i++) + if (process_exists(pids[i])) + return 1; + + return 0; +} + +static void shm_queue_init(void) +{ + int ret; + + shmfd = open(shmfile, O_CREAT | O_RDWR, 0644); + assert(shmfd >= 0); + + shm_queue_lock(); + + ret = ftruncate(shmfd, sizeof(*shm_queue)); + assert(ret == 0); + + shm_queue = mmap(0, sizeof(*shm_queue), + PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0); + assert(shm_queue != MAP_FAILED); + + if (is_shm_queue_valid()) + event_pos = shm_queue->pos; + else { + /* initialize shared memory */ + event_pos = 0; + memset(shm_queue, 0, sizeof(*shm_queue)); + ret = ftruncate(shmfd, 0); + assert(ret == 0); + ret = ftruncate(shmfd, sizeof(*shm_queue)); + assert(ret == 0); + + shm_queue_set_chksum(); + } + + shm_queue_unlock(); +} + +static void add_event(enum local_event_type type, + struct sheepdog_node_list_entry *node, void *buf, + size_t buf_len, void (*block_cb)(void *arg)) +{ + int idx; + struct sheepdog_node_list_entry *n; + pid_t *p; + struct local_event ev = { + .type = type, + .sender = *node, + }; + + ev.buf_len = buf_len; + if (buf) + memcpy(ev.buf, buf, buf_len); + + ev.nr_nodes = get_nodes(ev.nodes, ev.pids); + + switch (type) { + case EVENT_JOIN: + ev.blocked = 1; + ev.nodes[ev.nr_nodes] = *node; + ev.pids[ev.nr_nodes] = getpid(); /* must be local node */ + ev.nr_nodes++; + break; + case EVENT_LEAVE: + n = lfind(node, ev.nodes, &ev.nr_nodes, sizeof(*n), node_cmp); + if (!n) + panic("internal error\n"); + idx = n - ev.nodes; + p = ev.pids + idx; + + ev.nr_nodes--; + memmove(n, n + 1, sizeof(*n) * (ev.nr_nodes - idx)); + memmove(p, p + 1, sizeof(*p) * (ev.nr_nodes - idx)); + break; + case EVENT_NOTIFY: + ev.blocked = !!block_cb; + ev.block_cb = block_cb; + break; + } + + shm_queue_push(&ev); + + shm_queue_notify(); +} + +static void check_pids(void *arg) +{ + int i; + size_t nr; + struct sheepdog_node_list_entry nodes[SD_MAX_NODES]; + pid_t pids[SD_MAX_NODES]; + + shm_queue_lock(); + + nr = get_nodes(nodes, pids); + + for (i = 0; i < nr; i++) + if (!process_exists(pids[i])) + add_event(EVENT_LEAVE, nodes + i, NULL, 0, NULL); + + shm_queue_unlock(); + + add_timer(arg, 1); +} + + +/* Local driver APIs */ + +static int local_init(struct cdrv_handlers *handlers, uint8_t *myaddr) +{ + sigset_t mask; + static struct timer t = { + .callback = check_pids, + .data = &t, + }; + + lhdlrs = *handlers; + + /* set 127.0.0.1 */ + memset(myaddr, 0, 16); + myaddr[12] = 127; + myaddr[15] = 1; + + shm_queue_init(); + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR1); + sigprocmask(SIG_BLOCK, &mask, NULL); + + sigfd = signalfd(-1, &mask, SFD_NONBLOCK); + if (sigfd < 0) { + eprintf("failed to create a signal fd, %m\n"); + return 1; + } + + add_timer(&t, 1); + + local_block_wq = init_work_queue(1); + + return sigfd; +} + +static int local_join(struct sheepdog_node_list_entry *myself, + enum cluster_join_result (*check_join_cb)( + struct sheepdog_node_list_entry *joining, + void *opaque), + void *opaque, size_t opaque_len) +{ + this_node = *myself; + local_check_join_cb = check_join_cb; + + shm_queue_lock(); + + add_event(EVENT_JOIN, &this_node, opaque, opaque_len, NULL); + + shm_queue_unlock(); + + return 0; +} + +static int local_leave(void) +{ + shm_queue_lock(); + + add_event(EVENT_LEAVE, &this_node, NULL, 0, NULL); + + shm_queue_unlock(); + + return 0; +} + +static int local_notify(void *msg, size_t msg_len, void (*block_cb)(void *arg)) +{ + shm_queue_lock(); + + add_event(EVENT_NOTIFY, &this_node, msg, msg_len, block_cb); + + shm_queue_unlock(); + + return 0; +} + +static void local_block(struct work *work, int idx) +{ + struct local_event *ev; + + shm_queue_lock(); + + ev = shm_queue_peek(); + + ev->block_cb(ev->buf); + ev->blocked = 0; + msync(ev, sizeof(*ev), MS_SYNC); + + shm_queue_notify(); + + shm_queue_unlock(); +} + +static void local_block_done(struct work *work, int idx) +{ +} + +static int local_dispatch(void) +{ + int ret; + struct signalfd_siginfo siginfo; + struct local_event *ev; + enum cluster_join_result res; + static struct work work = { + .fn = local_block, + .done = local_block_done, + }; + + dprintf("read siginfo\n"); + ret = read(sigfd, &siginfo, sizeof(siginfo)); + assert(ret == sizeof(siginfo)); + + shm_queue_lock(); + + ev = shm_queue_peek(); + if (!ev) + goto out; + + switch (ev->type) { + case EVENT_JOIN: + if (ev->blocked) { + if (node_cmp(&ev->nodes[0], &this_node) == 0) { + res = local_check_join_cb(&ev->sender, ev->buf); + ev->join_result = res; + ev->blocked = 0; + msync(ev, sizeof(*ev), MS_SYNC); + + shm_queue_notify(); + + if (res == CJ_RES_MASTER_TRANSFER) { + eprintf("Restart me after master is up.\n"); + shm_queue_unlock(); + exit(1); + } + } + goto out; + } + + if (ev->join_result == CJ_RES_MASTER_TRANSFER) { + /* FIXME: This code is tricky, but Sheepdog assumes that */ + /* nr_nodes = 1 when join_result = MASTER_TRANSFER... */ + ev->nr_nodes = 1; + ev->nodes[0] = this_node; + ev->pids[0] = getpid(); + + shm_queue_set_chksum(); + } + + lhdlrs.join_handler(&ev->sender, ev->nodes, ev->nr_nodes, + ev->join_result, ev->buf); + break; + case EVENT_LEAVE: + lhdlrs.leave_handler(&ev->sender, ev->nodes, ev->nr_nodes); + break; + case EVENT_NOTIFY: + if (ev->blocked) { + if (node_cmp(&ev->sender, &this_node) == 0) { + if (!ev->callbacked) { + queue_work(local_block_wq, &work); + + ev->callbacked = 1; + } + } + goto out; + } + + lhdlrs.notify_handler(&ev->sender, ev->buf, ev->buf_len); + break; + } + + shm_queue_pop(); +out: + shm_queue_unlock(); + + return 0; +} + +struct cluster_driver cdrv_local = { + .name = "local", + + .init = local_init, + .join = local_join, + .leave = local_leave, + .notify = local_notify, + .dispatch = local_dispatch, +}; + +cdrv_register(cdrv_local); -- 1.7.2.5 -- sheepdog mailing list [email protected] http://lists.wpkg.org/mailman/listinfo/sheepdog
