This is originally a part of dog (puppy).
Cluster communication and VDI manipulation are supported.

Signed-off-by: MORITA Kazutaka <[email protected]>
---
 collie/group.c |  761 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 collie/vdi.c   |  259 +++++++++++++++++++
 2 files changed, 1020 insertions(+), 0 deletions(-)
 create mode 100644 collie/group.c
 create mode 100644 collie/vdi.c

diff --git a/collie/group.c b/collie/group.c
new file mode 100644
index 0000000..a718afd
--- /dev/null
+++ b/collie/group.c
@@ -0,0 +1,761 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <sys/time.h>
+#include <corosync/cpg.h>
+
+#include "sheepdog_proto.h"
+#include "collie.h"
+#include "list.h"
+#include "util.h"
+#include "meta.h"
+#include "logger.h"
+#include "work.h"
+
+struct vm {
+       struct sheepdog_vm_list_entry ent;
+       struct list_head list;
+};
+
+struct node {
+       uint32_t nodeid;
+       uint32_t pid;
+       struct sheepdog_node_list_entry ent;
+       struct list_head list;
+};
+
+struct message_header {
+       uint8_t op;
+       uint8_t done;
+       uint8_t pad[2];
+       uint32_t msg_length;
+       struct sheepdog_node_list_entry from;
+};
+
+struct join_message {
+       struct message_header header;
+       uint32_t nodeid;
+       uint32_t pid;
+       struct sheepdog_node_list_entry master_node;
+       uint32_t epoch;
+       uint32_t nr_nodes;
+       struct {
+               uint32_t nodeid;
+               uint32_t pid;
+               struct sheepdog_node_list_entry ent;
+       } nodes[SD_MAX_NODES];
+};
+
+struct vdi_op_message {
+       struct message_header header;
+       struct sd_vdi_req req;
+       struct sd_vdi_rsp rsp;
+       uint8_t data[0];
+};
+
+struct work_deliver {
+       struct message_header *msg;
+
+       struct cluster_info *ci;
+       struct work work;
+};
+
+struct work_confch {
+       struct cpg_address *member_list;
+       size_t member_list_entries;
+       struct cpg_address *left_list;
+       size_t left_list_entries;
+       struct cpg_address *joined_list;
+       size_t joined_list_entries;
+
+       struct cluster_info *ci;
+       struct work work;
+};
+
+static int node_cmp(const void *a, const void *b)
+{
+       const struct sheepdog_node_list_entry *node1 = a;
+       const struct sheepdog_node_list_entry *node2 = b;
+       return memcmp(node1->id, node2->id, sizeof(node1->id));
+}
+
+static int send_message(cpg_handle_t handle, struct message_header *msg)
+{
+       struct iovec iov;
+       int ret;
+
+       iov.iov_base = msg;
+       iov.iov_len = msg->msg_length;
+retry:
+       ret = cpg_mcast_joined(handle, CPG_TYPE_AGREED, &iov, 1);
+       switch (ret) {
+       case CS_OK:
+               break;
+       case CS_ERR_TRY_AGAIN:
+               dprintf("failed to send message. try again\n");
+               sleep(1);
+               goto retry;
+       default:
+               eprintf("failed to send message, %d\n", ret);
+               return -1;
+       }
+       return 0;
+}
+
+
+static int get_node_idx(struct sheepdog_node_list_entry *ent,
+                       struct sheepdog_node_list_entry *entries, int nr_nodes)
+{
+       ent = bsearch(ent, entries, nr_nodes, sizeof(*ent), node_cmp);
+       if (!ent)
+               return -1;
+
+       return ent - entries;
+}
+
+static void get_node_list(struct cluster_info *cluster, struct sd_node_req 
*req,
+                         struct sd_node_rsp *rsp, void *data)
+{
+       int nr_nodes;
+       struct node *node;
+
+       nr_nodes = build_node_list(&cluster->node_list, data);
+       rsp->data_length = nr_nodes * sizeof(struct sheepdog_node_list_entry);
+       rsp->nr_nodes = nr_nodes;
+       rsp->local_idx = get_node_idx(&cluster->this_node, data, nr_nodes);
+
+       if (list_empty(&cluster->node_list)) {
+               rsp->master_idx = -1;
+               return;
+       }
+       node = list_first_entry(&cluster->node_list, struct node, list);
+       rsp->master_idx = get_node_idx(&node->ent, data, nr_nodes);
+}
+
+static void get_vm_list(struct cluster_info *cluster, struct sd_rsp *rsp,
+                       void *data)
+{
+       int nr_vms;
+       struct vm *vm;
+
+       struct sheepdog_vm_list_entry *p = data;
+       list_for_each_entry(vm, &cluster->vm_list, list) {
+               *p++ = vm->ent;
+       }
+
+       nr_vms = p - (struct sheepdog_vm_list_entry *)data;
+       rsp->data_length = nr_vms * sizeof(struct sheepdog_vm_list_entry);
+}
+
+void cluster_queue_request(struct work *work, int idx)
+{
+       struct request *req = container_of(work, struct request, work);
+       struct sd_req *hdr = (struct sd_req *)&req->rq;
+       struct sd_rsp *rsp = (struct sd_rsp *)&req->rp;
+       struct cluster_info *cluster = req->ci->cluster;
+       struct vdi_op_message *msg;
+       int ret = SD_RES_SUCCESS;
+
+       eprintf("%p %x\n", req, hdr->opcode);
+
+       switch (hdr->opcode) {
+       case SD_OP_GET_NODE_LIST:
+               get_node_list(cluster, (struct sd_node_req *)hdr,
+                             (struct sd_node_rsp *)rsp, req->data);
+               break;
+       case SD_OP_GET_VM_LIST:
+               get_vm_list(cluster, rsp, req->data);
+               break;
+       default:
+               /* forward request to group */
+               goto forward;
+       }
+
+       rsp->result = ret;
+       return;
+
+forward:
+       msg = zalloc(sizeof(*msg) + hdr->data_length);
+       if (!msg) {
+               eprintf("out of memory\n");
+               return;
+       }
+
+       msg->header.op = SD_MSG_VDI_OP;
+       msg->header.done = 0;
+       msg->header.msg_length = sizeof(*msg) + hdr->data_length;
+       msg->header.from = cluster->this_node;
+       msg->req = *((struct sd_vdi_req *)&req->rq);
+       msg->rsp = *((struct sd_vdi_rsp *)&req->rp);
+       if (hdr->flags & SD_FLAG_CMD_WRITE)
+               memcpy(msg->data, req->data, hdr->data_length);
+
+       list_add(&req->pending_list, &cluster->pending_list);
+
+       send_message(cluster->handle, (struct message_header *)msg);
+
+       free(msg);
+}
+
+static struct vm *lookup_vm(struct list_head *entries, char *name)
+{
+       struct vm *vm;
+
+       list_for_each_entry(vm, entries, list) {
+               if (!strcmp((char *)vm->ent.name, name))
+                       return vm;
+       }
+
+       return NULL;
+}
+
+static void group_handler(int listen_fd, int events, void *data)
+{
+       struct cluster_info *ci = data;
+       cpg_dispatch(ci->handle, CPG_DISPATCH_ALL);
+}
+
+static void print_node_list(struct cluster_info *ci)
+{
+       struct node *node;
+       list_for_each_entry(node, &ci->node_list, list) {
+               dprintf("%c nodeid: %x, pid: %d, ip: %d.%d.%d.%d:%d\n",
+                       node_cmp(&node->ent, &ci->this_node) ? ' ' : 'l',
+                       node->nodeid, node->pid,
+                       node->ent.addr[12], node->ent.addr[13],
+                       node->ent.addr[14], node->ent.addr[15], node->ent.port);
+       }
+}
+
+static void add_node(struct cluster_info *ci, uint32_t nodeid, uint32_t pid,
+                    struct sheepdog_node_list_entry *sd_ent)
+{
+       struct node *node;
+
+       node = zalloc(sizeof(*node));
+       if (!node) {
+               eprintf("out of memory\n");
+               return;
+       }
+       node->nodeid = nodeid;
+       node->pid = pid;
+       node->ent = *sd_ent;
+       list_add_tail(&node->list, &ci->node_list);
+       ci->epoch++;
+}
+
+static int is_master(struct cluster_info *ci)
+{
+       struct node *node;
+
+       if (!ci->synchronized)
+               return 0;
+
+       if (list_empty(&ci->node_list))
+               return 1;
+
+       node = list_first_entry(&ci->node_list, struct node, list);
+       if (node_cmp(&node->ent, &ci->this_node) == 0)
+               return 1;
+
+       return 0;
+}
+
+static void join(struct cluster_info *ci, struct join_message *msg)
+{
+       struct node *node;
+
+       if (!ci->synchronized)
+               return;
+
+       if (!is_master(ci))
+               return;
+
+       msg->epoch = ci->epoch;
+       list_for_each_entry(node, &ci->node_list, list) {
+               msg->nodes[msg->nr_nodes].nodeid = node->nodeid;
+               msg->nodes[msg->nr_nodes].pid = node->pid;
+               msg->nodes[msg->nr_nodes].ent = node->ent;
+               msg->nr_nodes++;
+       }
+}
+
+static void update_cluster_info(struct cluster_info *ci,
+                               struct join_message *msg)
+{
+       int i;
+       int nr_nodes = msg->nr_nodes;
+       struct node *node, *e;
+
+       if (ci->synchronized)
+               goto out;
+
+       list_for_each_entry_safe(node, e, &ci->node_list, list) {
+               list_del(&node->list);
+               free(node);
+       }
+
+       INIT_LIST_HEAD(&ci->node_list);
+       for (i = 0; i < nr_nodes; i++)
+               add_node(ci, msg->nodes[i].nodeid, msg->nodes[i].pid,
+                        &msg->nodes[i].ent);
+
+       ci->epoch = msg->epoch;
+       ci->synchronized = 1;
+
+out:
+       add_node(ci, msg->nodeid, msg->pid, &msg->header.from);
+       print_node_list(ci);
+}
+
+static void vdi_op(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+       const struct sd_vdi_req *hdr = &msg->req;
+       struct sd_vdi_rsp *rsp = &msg->rsp;
+       void *data = msg->data;
+       int ret = SD_RES_SUCCESS, is_current;
+       uint64_t oid = 0;
+       struct sheepdog_super_block *sb;
+       struct timeval tv;
+       struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+       int nr_nodes;
+
+       switch (hdr->opcode) {
+       case SD_OP_NEW_VDI:
+               ret = add_vdi(ci, data, strlen(data), hdr->vdi_size, &oid,
+                             hdr->base_oid, hdr->tag);
+               break;
+       case SD_OP_LOCK_VDI:
+       case SD_OP_GET_VDI_INFO:
+               ret = lookup_vdi(ci, data, &oid, hdr->tag, 1, &is_current);
+               if (ret < 0)
+                       break;
+               if (is_current)
+                       rsp->flags = SD_VDI_RSP_FLAG_CURRENT;
+               break;
+       case SD_OP_RELEASE_VDI:
+               break;
+       case SD_OP_MAKE_FS:
+               sb = zalloc(sizeof(*sb));
+               if (!sb) {
+                       ret = -1;
+                       break;
+               }
+               gettimeofday(&tv, NULL);
+               sb->ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+               sb->default_nr_copies = 3;
+
+               nr_nodes = build_node_list(&ci->node_list, entries);
+               ret = write_object(entries, nr_nodes, ci->epoch,
+                                  SD_DIR_OID, (char *)sb, sizeof(*sb), 0,
+                                  sb->default_nr_copies, 1);
+               break;
+       case SD_OP_UPDATE_EPOCH:
+               break;
+       default:
+               ret = SD_RES_SYSTEM_ERROR;
+               eprintf("opcode %d is not implemented\n", hdr->opcode);
+               break;
+       }
+
+       rsp->oid = oid;
+       rsp->result = ret;
+}
+
+static void vdi_op_done(struct cluster_info *ci, struct vdi_op_message *msg)
+{
+       const struct sd_vdi_req *hdr = &msg->req;
+       struct sd_vdi_rsp *rsp = &msg->rsp;
+       void *data = msg->data;
+       struct vm *vm;
+       struct request *req;
+       int ret = msg->rsp.result;
+
+       switch (hdr->opcode) {
+       case SD_OP_NEW_VDI:
+               break;
+       case SD_OP_LOCK_VDI:
+               if (lookup_vm(&ci->vm_list, (char *)data)) {
+                       ret = SD_RES_VDI_LOCKED;
+                       break;
+               }
+
+               vm = zalloc(sizeof(*vm));
+               if (!vm) {
+                       ret = SD_RES_UNKNOWN;
+                       break;
+               }
+               strcpy((char *)vm->ent.name, (char *)data);
+               memcpy(vm->ent.host_addr, msg->header.from.addr,
+                      sizeof(vm->ent.host_addr));
+               vm->ent.host_port = msg->header.from.port;
+
+               list_add(&vm->list, &ci->vm_list);
+               break;
+       case SD_OP_RELEASE_VDI:
+               vm = lookup_vm(&ci->vm_list, (char *)data);
+               if (!vm) {
+                       ret = SD_RES_VDI_NOT_LOCKED;
+                       break;
+               }
+
+               list_del(&vm->list);
+               free(vm);
+               break;
+       case SD_OP_GET_VDI_INFO:
+               break;
+       case SD_OP_UPDATE_EPOCH:
+               break;
+       case SD_OP_MAKE_FS:
+               break;
+       default:
+               eprintf("unknown operation %d\n", hdr->opcode);
+               ret = SD_RES_UNKNOWN;
+       }
+
+       if (node_cmp(&ci->this_node, &msg->header.from) != 0)
+               return;
+
+       req = list_first_entry(&ci->pending_list, struct request, pending_list);
+
+       rsp->result = ret;
+       memcpy(req->data, data, rsp->data_length);
+       memcpy(&req->rp, rsp, sizeof(req->rp));
+       list_del(&req->pending_list);
+       req->done(req);
+}
+
+static void __sd_deliver(struct work *work, int idx)
+{
+       struct work_deliver *w = container_of(work, struct work_deliver, work);
+       struct message_header *m = w->msg;
+       struct cluster_info *ci = w->ci;
+
+       dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+               m->op, m->done, m->msg_length,
+               m->from.addr[12], m->from.addr[13],
+               m->from.addr[14], m->from.addr[15], m->from.port);
+
+       if (!m->done) {
+               if (!is_master(ci))
+                       return;
+
+               switch (m->op) {
+               case SD_MSG_JOIN:
+                       join(ci, (struct join_message *)m);
+                       break;
+               case SD_MSG_VDI_OP:
+                       vdi_op(ci, (struct vdi_op_message *)m);
+                       break;
+               default:
+                       eprintf("unknown message %d\n", m->op);
+                       break;
+               }
+
+               m->done = 1;
+               send_message(ci->handle, m);
+       } else {
+               switch (m->op) {
+               case SD_MSG_JOIN:
+                       update_cluster_info(ci, (struct join_message *)m);
+                       break;
+               case SD_MSG_VDI_OP:
+                       vdi_op_done(ci, (struct vdi_op_message *)m);
+                       break;
+               default:
+                       eprintf("unknown message %d\n", m->op);
+                       break;
+               }
+       }
+}
+
+static void __sd_deliver_done(struct work *work, int idx)
+{
+       struct work_deliver *w = container_of(work, struct work_deliver, work);
+
+       free(w->msg);
+       free(w);
+}
+
+static void sd_deliver(cpg_handle_t handle, const struct cpg_name *group_name,
+                      uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
+{
+       struct work_deliver *w;
+       struct message_header *m = msg;
+       struct cluster_info *ci;
+
+       dprintf("op: %d, done: %d, size: %d, from: %d.%d.%d.%d:%d\n",
+               m->op, m->done, m->msg_length,
+               m->from.addr[12], m->from.addr[13],
+               m->from.addr[14], m->from.addr[15], m->from.port);
+       cpg_context_get(handle, (void **)&ci);
+
+       w = zalloc(sizeof(*w));
+       if (!w)
+               return;
+
+       w->msg = zalloc(msg_len);
+       if (!w->msg)
+               return;
+       memcpy(w->msg, msg, msg_len);
+
+       w->ci = ci;
+
+       w->work.fn = __sd_deliver;
+       w->work.done = __sd_deliver_done;
+
+       queue_work(&w->work);
+}
+
+static void __sd_confch(struct work *work, int idx)
+{
+       struct work_confch *w = container_of(work, struct work_confch, work);
+       struct cluster_info *ci = w->ci;
+       struct node *node, *e;
+       int i;
+
+       const struct cpg_address *member_list = w->member_list;
+       size_t member_list_entries = w->member_list_entries;
+       const struct cpg_address *left_list = w->left_list;
+       size_t left_list_entries = w->left_list_entries;
+       const struct cpg_address *joined_list = w->joined_list;
+       size_t joined_list_entries = w->joined_list_entries;
+
+       if (member_list_entries == joined_list_entries - left_list_entries &&
+           ci->this_nodeid == member_list[0].nodeid &&
+           ci->this_pid == member_list[0].pid)
+               ci->synchronized = 1;
+
+       for (i = 0; i < left_list_entries; i++) {
+               list_for_each_entry_safe(node, e, &ci->node_list, list) {
+                       if (node->nodeid != left_list[i].nodeid ||
+                           node->pid != left_list[i].pid)
+                               continue;
+
+                       list_del(&node->list);
+                       free(node);
+                       ci->epoch++;
+               }
+       }
+
+       for (i = 0; i < joined_list_entries; i++) {
+               if (ci->this_nodeid == joined_list[0].nodeid &&
+                   ci->this_pid == joined_list[0].pid) {
+                       struct join_message msg;
+
+                       msg.header.op = SD_MSG_JOIN;
+                       msg.header.done = 0;
+                       msg.header.msg_length = sizeof(msg);
+                       msg.header.from = ci->this_node;
+                       msg.nodeid = ci->this_nodeid;
+                       msg.pid = ci->this_pid;
+
+                       send_message(ci->handle, (struct message_header *)&msg);
+               }
+       }
+
+       if (left_list_entries == 0)
+               return;
+
+       print_node_list(ci);
+}
+
+static void __sd_confch_done(struct work *work, int idx)
+{
+       struct work_confch *w = container_of(work, struct work_confch, work);
+
+       free(w->member_list);
+       free(w->left_list);
+       free(w->joined_list);
+       free(w);
+}
+
+static void sd_confch(cpg_handle_t handle, const struct cpg_name *group_name,
+                     const struct cpg_address *member_list,
+                     size_t member_list_entries,
+                     const struct cpg_address *left_list,
+                     size_t left_list_entries,
+                     const struct cpg_address *joined_list,
+                     size_t joined_list_entries)
+{
+       struct work_confch *w = NULL;
+       struct cluster_info *ci;
+       int i, size;
+
+       dprintf("confchg nodeid %x\n", member_list[0].nodeid);
+       dprintf("%zd %zd %zd\n", member_list_entries, left_list_entries,
+               joined_list_entries);
+       for (i = 0; i < member_list_entries; i++) {
+               dprintf("[%d] node_id: %d, pid: %d, reason: %d\n", i,
+                       member_list[i].nodeid, member_list[i].pid,
+                       member_list[i].reason);
+       }
+
+       cpg_context_get(handle, (void **)&ci);
+
+       w = zalloc(sizeof(*w));
+       if (!w)
+               return;
+
+       size = sizeof(struct cpg_address) * member_list_entries;
+       w->member_list = zalloc(size);
+       if (!w->member_list)
+               goto err;
+       memcpy(w->member_list, member_list, size);
+       w->member_list_entries = member_list_entries;
+
+       size = sizeof(struct cpg_address) * left_list_entries;
+       w->left_list = zalloc(size);
+       if (!w->left_list)
+               goto err;
+       memcpy(w->left_list, left_list, size);
+       w->left_list_entries = left_list_entries;
+
+       size = sizeof(struct cpg_address) * joined_list_entries;
+       w->joined_list = zalloc(size);
+       if (!w->joined_list)
+               goto err;
+       memcpy(w->joined_list, joined_list, size);
+       w->joined_list_entries = joined_list_entries;
+
+       w->ci = ci;
+
+       w->work.fn = __sd_confch;
+       w->work.done = __sd_confch_done;
+
+       queue_work(&w->work);
+
+       return;
+err:
+       if (!w)
+               return;
+
+       if (w->member_list)
+               free(w->member_list);
+       if (w->left_list)
+               free(w->left_list);
+       if (w->joined_list)
+               free(w->joined_list);
+}
+
+int build_node_list(struct list_head *node_list,
+                   struct sheepdog_node_list_entry *entries)
+{
+       struct node *node;
+       int nr = 0;
+
+       list_for_each_entry(node, node_list, list) {
+               if (entries)
+                       memcpy(entries + nr, &node->ent, sizeof(*entries));
+               nr++;
+       }
+       if (entries)
+               qsort(entries, nr, sizeof(*entries), node_cmp);
+
+       return nr;
+}
+
+struct cluster_info *create_cluster(int port)
+{
+       int fd, ret;
+       cpg_handle_t cpg_handle;
+       struct cluster_info *ci;
+       struct addrinfo hints, *res;
+       char name[INET6_ADDRSTRLEN];
+       SHA_CTX ctx;
+       struct cpg_name group = { 8, "sheepdog" };
+       cpg_callbacks_t cb = { &sd_deliver, &sd_confch };
+       unsigned int nodeid = 0;
+
+       ci = zalloc(sizeof(*ci));
+       if (!ci)
+               return NULL;
+
+       ret = cpg_initialize(&cpg_handle, &cb);
+       if (ret != CS_OK) {
+               eprintf("Failed to initialize cpg, %d\n", ret);
+               eprintf("Is corosync running?\n");
+               return NULL;
+       }
+
+join_retry:
+       ret = cpg_join(cpg_handle, &group);
+       switch (ret) {
+       case CS_OK:
+               break;
+       case CS_ERR_TRY_AGAIN:
+               dprintf("Failed to join the sheepdog group, try again\n");
+               sleep(1);
+               goto join_retry;
+       case CS_ERR_SECURITY:
+               eprintf("Permission error.\n");
+               exit(1);
+       default:
+               eprintf("Failed to join the sheepdog group, %d\n", ret);
+               exit(1);
+               break;
+       }
+
+       ret = cpg_local_get(cpg_handle, &nodeid);
+       if (ret != CS_OK) {
+               eprintf("Failed to get the local node's identifier, %d\n", ret);
+               exit(1);
+       }
+
+       ci->handle = cpg_handle;
+       ci->this_nodeid = nodeid;
+       ci->this_pid = getpid();
+
+       memset(&ci->this_node, 0, sizeof(ci->this_node));
+
+       gethostname(name, sizeof(name));
+
+       memset(&hints, 0, sizeof(hints));
+
+       hints.ai_socktype = SOCK_STREAM;
+       ret = getaddrinfo(name, NULL, &hints, &res);
+       if (ret)
+               exit(1);
+
+       if (res->ai_family == AF_INET) {
+               struct sockaddr_in *addr = (struct sockaddr_in *)res->ai_addr;
+               memset(ci->this_node.addr, 0, sizeof(ci->this_node.addr));
+               memcpy(ci->this_node.addr + 12, &addr->sin_addr, 4);
+       } else if (res->ai_family == AF_INET6) {
+               struct sockaddr_in6 *addr = (struct sockaddr_in6 *)res->ai_addr;
+               memcpy(ci->this_node.addr, &addr->sin6_addr, 16);
+       } else {
+               eprintf("unknown address family\n");
+               exit(1);
+       }
+
+       freeaddrinfo(res);
+
+       ci->this_node.port = port;
+
+       SHA1_Init(&ctx);
+       SHA1_Update(&ctx, ci->this_node.addr, sizeof(ci->this_node.addr));
+       SHA1_Update(&ctx, &ci->this_node.port, sizeof(ci->this_node.port));
+       SHA1_Final(ci->this_node.id, &ctx);
+
+       ci->synchronized = 0;
+       INIT_LIST_HEAD(&ci->node_list);
+       INIT_LIST_HEAD(&ci->vm_list);
+       INIT_LIST_HEAD(&ci->pending_list);
+       cpg_context_set(cpg_handle, ci);
+
+       cpg_fd_get(cpg_handle, &fd);
+       register_event(fd, group_handler, ci);
+       return ci;
+}
diff --git a/collie/vdi.c b/collie/vdi.c
new file mode 100644
index 0000000..184a22e
--- /dev/null
+++ b/collie/vdi.c
@@ -0,0 +1,259 @@
+/*
+ * Copyright (C) 2009 Nippon Telegraph and Telephone Corporation.
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License version
+ * 2 as published by the Free Software Foundation.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <openssl/sha.h>
+
+#include "sheepdog_proto.h"
+#include "meta.h"
+#include "collie.h"
+
+static int sheepdog_match(struct sheepdog_dir_entry *ent, char *name, int len)
+{
+       if (!ent->name_len)
+               return 0;
+       if (ent->name_len != len)
+               return 0;
+       return !memcmp(ent->name, name, len);
+}
+
+/* TODO: should be performed atomically */
+static int create_inode_obj(struct sheepdog_node_list_entry *entries,
+                           int nr_nodes, uint64_t epoch, int copies,
+                           uint64_t oid, uint64_t size, uint64_t base_oid)
+{
+       struct sheepdog_inode inode, base;
+       struct timeval tv;
+       int ret;
+
+       if (base_oid) {
+               ret = read_object(entries, nr_nodes, epoch,
+                                 base_oid, (char *)&base, sizeof(base), 0,
+                                 copies);
+               if (ret < 0)
+                       return SD_RES_BASE_VDI_READ;
+       }
+
+       gettimeofday(&tv, NULL);
+
+       memset(&inode, 0, sizeof(inode));
+
+       inode.oid = oid;
+       inode.vdi_size = size;
+       inode.block_size = SD_DATA_OBJ_SIZE;
+       inode.ctime = (uint64_t) tv.tv_sec << 32 | tv.tv_usec * 1000;
+       inode.nr_copies = copies;
+
+       if (base_oid) {
+               int i;
+
+               eprintf("%zd %zd\n", sizeof(inode.data_oid),
+                       ARRAY_SIZE(base.child_oid));
+               inode.parent_oid = base_oid;
+               memcpy(inode.data_oid, base.data_oid,
+                      MAX_DATA_OBJS * sizeof(uint64_t));
+
+               for (i = 0; i < ARRAY_SIZE(base.child_oid); i++) {
+                       if (!base.child_oid[i]) {
+                               base.child_oid[i] = oid;
+                               break;
+                       }
+               }
+
+               if (i == ARRAY_SIZE(base.child_oid))
+                       return SD_RES_NO_BASE_VDI;
+
+               ret = write_object(entries, nr_nodes,
+                                  epoch, base_oid, (char *)&base,
+                                  sizeof(base), 0, copies, 0);
+               if (ret < 0)
+                       return SD_RES_BASE_VDI_WRITE;
+       }
+
+       ret = write_object(entries, nr_nodes, epoch,
+                          oid, (char *)&inode, sizeof(inode), 0, copies, 1);
+       if (ret < 0)
+               return SD_RES_VDI_WRITE;
+
+       return ret;
+}
+
+#define DIR_BUF_LEN (UINT64_C(1) << 20)
+
+/*
+ * TODO: handle larger buffer
+ */
+int add_vdi(struct cluster_info *cluster, char *name, int len, uint64_t size,
+           uint64_t *added_oid, uint64_t base_oid, uint32_t tag)
+{
+       struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+       int nr_nodes;
+       struct sheepdog_dir_entry *prv, *ent;
+       uint64_t oid = 0;
+       char *buf;
+       int ret, rest;
+       struct sheepdog_super_block *sb;
+       int copies;
+
+       nr_nodes = build_node_list(&cluster->node_list, entries);
+
+       eprintf("%s (%d) %" PRIu64 ", base: %" PRIu64 "\n", name, len, size,
+               base_oid);
+
+       buf = zalloc(DIR_BUF_LEN);
+       if (!buf)
+               return 1;
+
+       ret = read_object(entries, nr_nodes, cluster->epoch,
+                         SD_DIR_OID, buf, DIR_BUF_LEN, 0, nr_nodes);
+       if (ret < 0) {
+               ret = SD_RES_DIR_READ;
+               goto out;
+       }
+
+       sb = (struct sheepdog_super_block *)buf;
+       copies = sb->default_nr_copies;
+
+       ret = read_object(entries, nr_nodes, cluster->epoch,
+                         SD_DIR_OID, buf, DIR_BUF_LEN, sizeof(*sb), nr_nodes);
+       if (ret < 0) {
+               ret = SD_RES_DIR_READ;
+               goto out;
+       }
+
+       ent = (struct sheepdog_dir_entry *)buf;
+       rest = ret;
+       while (rest > 0) {
+               if (!ent->name_len)
+                       break;
+
+               if (sheepdog_match(ent, name, len) && !tag) {
+                       ret = SD_RES_VDI_EXIST;
+                       goto out;
+               }
+               oid = ent->oid;
+               prv = ent;
+               ent = next_entry(prv);
+               rest -= ((char *)ent - (char *)prv);
+       }
+
+       /* need to check if the buffer is large enough here. */
+       oid += (1 << 18);
+
+       ret = create_inode_obj(entries, nr_nodes, cluster->epoch, copies,
+                              oid, size, base_oid);
+       if (ret)
+               goto out;
+
+       ent->oid = oid;
+       ent->tag = tag;
+
+       ent->flags = FLAG_CURRENT;
+       ent->name_len = len;
+       memcpy(ent->name, name, len);
+
+       if (tag) {
+               struct sheepdog_dir_entry *e = (struct sheepdog_dir_entry *)buf;
+
+               while (e < ent) {
+                       if (sheepdog_match(e, name, len))
+                               e->flags &= ~FLAG_CURRENT;
+                       e = next_entry(e);
+               }
+       }
+
+       ent = next_entry(ent);
+
+       ret = write_object(entries, nr_nodes, cluster->epoch,
+                          SD_DIR_OID, buf, (char *)ent - buf, sizeof(*sb),
+                          copies, 0);
+       if (ret) {
+               ret = SD_RES_DIR_WRITE;
+               goto out;
+       }
+
+       *added_oid = oid;
+out:
+       free(buf);
+
+       return ret;
+}
+
+int del_vdi(struct cluster_info *cluster, char *name, int len)
+{
+       return 0;
+}
+
+int lookup_vdi(struct cluster_info *cluster,
+              char *filename, uint64_t * oid, uint32_t tag, int do_lock,
+              int *current)
+{
+       struct sheepdog_node_list_entry entries[SD_MAX_NODES];
+       int nr_nodes;
+       int rest, ret;
+       char *buf;
+       struct sheepdog_dir_entry *prv, *ent;
+
+       nr_nodes = build_node_list(&cluster->node_list, entries);
+
+       *current = 0;
+       buf = zalloc(DIR_BUF_LEN);
+       if (!buf)
+               return 1;
+
+       ret = read_object(entries, nr_nodes, cluster->epoch,
+                         SD_DIR_OID, buf, DIR_BUF_LEN,
+                         sizeof(struct sheepdog_super_block), nr_nodes);
+       if (ret < 0) {
+               ret = SD_RES_DIR_READ;
+               goto out;
+       }
+
+       eprintf("looking for %s %zd, %d\n", filename, strlen(filename), ret);
+
+       ent = (struct sheepdog_dir_entry *)buf;
+       rest = ret;
+       ret = SD_RES_NO_VDI;
+       while (rest > 0) {
+               if (!ent->name_len)
+                       break;
+
+               eprintf("%s %d %" PRIu64 "\n", ent->name, ent->name_len,
+                       ent->oid);
+
+               if (sheepdog_match(ent, filename, strlen(filename))) {
+                       if (ent->tag != tag && tag != -1) {
+                               ret = SD_RES_NO_TAG;
+                               goto next;
+                       }
+                       if (ent->tag != tag && !(ent->flags & FLAG_CURRENT)) {
+                               /* current vdi must exsit */
+                               ret = SD_RES_SYSTEM_ERROR;
+                               goto next;
+                       }
+
+                       *oid = ent->oid;
+                       ret = 0;
+
+                       if (ent->flags & FLAG_CURRENT)
+                               *current = 1;
+                       break;
+               }
+next:
+               prv = ent;
+               ent = next_entry(prv);
+               rest -= ((char *)ent - (char *)prv);
+       }
+out:
+       free(buf);
+       return ret;
+}
-- 
1.5.6.5

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to