1. add basic {dog,sheep,sheepfs}_bnode_{reader,writer} function for b-tree to 
read/write node
2. add sd_extent_header to manage meta-data in data_vdi_id[] or middle-node
3. add new type of object: B-tree object as middle-node

Signed-off-by: Robin Dong <[email protected]>
---
 dog/cluster.c            |    2 +-
 dog/dog.h                |    9 +
 dog/vdi.c                |   59 ++++--
 include/sheepdog_proto.h |   61 ++++-
 lib/sd_inode.c           |  567 +++++++++++++++++++++++++++++++++++++++++++++-
 sheep/sheep_priv.h       |    9 +
 sheep/vdi.c              |   13 +-
 sheepfs/volume.c         |   43 ++++-
 8 files changed, 721 insertions(+), 42 deletions(-)

diff --git a/dog/cluster.c b/dog/cluster.c
index cf61372..eb3416a 100644
--- a/dog/cluster.c
+++ b/dog/cluster.c
@@ -261,7 +261,7 @@ static void fill_object_tree(uint32_t vid, const char 
*name, const char *tag,
        /* fill data object id */
        nr_objs = count_data_objs(i);
        for (uint64_t idx = 0; idx < nr_objs; idx++) {
-               vdi_id = sd_inode_get_vid(i, idx);
+               vdi_id = INODE_GET_VID(i, idx);
                if (vdi_id) {
                        uint64_t oid = vid_to_data_oid(vdi_id, idx);
                        object_tree_insert(oid, i->nr_copies, i->copy_policy);
diff --git a/dog/dog.h b/dog/dog.h
index 65d5e26..3cdf8e2 100644
--- a/dog/dog.h
+++ b/dog/dog.h
@@ -91,6 +91,15 @@ size_t get_store_objsize(uint8_t copy_policy, uint64_t oid);
 bool is_erasure_oid(uint64_t oid, uint8_t policy);
 uint8_t parse_copy(const char *str, uint8_t *copy_policy);
 
+int dog_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                    int copies, int copy_policy, int create);
+int dog_bnode_reader(uint64_t oid, void **mem, unsigned int len);
+
+#define INODE_GET_VID(inode, idx) (sd_inode_get_vid(dog_bnode_reader, \
+                                                       inode, idx))
+#define INODE_SET_VID(inode, idx, vdi_id) (sd_inode_set_vid(dog_bnode_writer, \
+                                       dog_bnode_reader, inode, idx, vdi_id))
+
 extern struct command vdi_command;
 extern struct command node_command;
 extern struct command cluster_command;
diff --git a/dog/vdi.c b/dog/vdi.c
index 7060cca..aa6eea6 100644
--- a/dog/vdi.c
+++ b/dog/vdi.c
@@ -58,6 +58,24 @@ struct get_vdi_info {
        uint8_t copy_policy;
 };
 
+int dog_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                    int copies, int copy_policy, int create)
+{
+       return sd_write_object(oid, 0, mem, len, 0, 0, copies, copy_policy,
+                       true, true);
+}
+
+int dog_bnode_reader(uint64_t oid, void **mem, unsigned int len)
+{
+       return sd_read_object(oid, *mem, len, 0, true);
+}
+
+static inline bool is_data_obj_writeable(const struct sd_inode *inode,
+                                        uint32_t idx)
+{
+       return inode->vdi_id == INODE_GET_VID(inode, idx);
+}
+
 static void vdi_show_progress(uint64_t done, uint64_t total)
 {
        return show_progress(done, total, false);
@@ -300,7 +318,7 @@ static int obj_info_filler(const char *sheep, uint64_t oid, 
struct sd_rsp *rsp,
                if (info->success)
                        break;
                info->success = true;
-               vdi_id = sd_inode_get_vid(inode, info->idx);
+               vdi_id = INODE_GET_VID(inode, info->idx);
                if (vdi_id) {
                        info->data_oid = vid_to_data_oid(vdi_id, info->idx);
                        return 1;
@@ -516,7 +534,7 @@ static int vdi_create(int argc, char **argv)
        uint64_t size;
        uint32_t vid;
        uint64_t oid;
-       int idx, max_idx, ret, nr_copies = vdi_cmd_data.nr_copies;
+       uint32_t idx, max_idx, ret, nr_copies = vdi_cmd_data.nr_copies;
        struct sd_inode *inode = NULL;
 
        if (!argv[optind]) {
@@ -563,7 +581,7 @@ static int vdi_create(int argc, char **argv)
                        goto out;
                }
 
-               sd_inode_set_vid(inode, idx, vid);
+               INODE_SET_VID(inode, idx, vid);
                ret = sd_write_object(vid_to_vdi_oid(vid), 0, &vid, sizeof(vid),
                                      SD_INODE_HEADER_SIZE + sizeof(vid) * idx,
                                      0, inode->nr_copies, inode->copy_policy,
@@ -632,7 +650,7 @@ static int vdi_clone(int argc, char **argv)
        const char *src_vdi = argv[optind++], *dst_vdi;
        uint32_t base_vid, new_vid, vdi_id;
        uint64_t oid;
-       int idx, max_idx, ret;
+       uint32_t idx, max_idx, ret;
        struct sd_inode *inode = NULL;
        char *buf = NULL;
 
@@ -670,7 +688,7 @@ static int vdi_clone(int argc, char **argv)
                size_t size;
 
                vdi_show_progress(idx * SD_DATA_OBJ_SIZE, inode->vdi_size);
-               vdi_id = sd_inode_get_vid(inode, idx);
+               vdi_id = INODE_GET_VID(inode, idx);
                if (vdi_id) {
                        oid = vid_to_data_oid(vdi_id, idx);
                        ret = sd_read_object(oid, buf, SD_DATA_OBJ_SIZE, 0, 
true);
@@ -1192,10 +1210,10 @@ static int vdi_getattr(int argc, char **argv)
 static int vdi_read(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
-       int ret, idx;
+       int ret;
        struct sd_inode *inode = NULL;
        uint64_t offset = 0, oid, done = 0, total = (uint64_t) -1;
-       uint32_t vdi_id;
+       uint32_t vdi_id, idx;
        unsigned int len;
        char *buf = NULL;
 
@@ -1230,7 +1248,7 @@ static int vdi_read(int argc, char **argv)
        offset %= SD_DATA_OBJ_SIZE;
        while (done < total) {
                len = min(total - done, SD_DATA_OBJ_SIZE - offset);
-               vdi_id = sd_inode_get_vid(inode, idx);
+               vdi_id = INODE_GET_VID(inode, idx);
                if (vdi_id) {
                        oid = vid_to_data_oid(vdi_id, idx);
                        ret = sd_read_object(oid, buf, len, offset, false);
@@ -1265,8 +1283,8 @@ out:
 static int vdi_write(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
-       uint32_t vid, flags, vdi_id;
-       int ret, idx;
+       uint32_t vid, flags, vdi_id, idx;
+       int ret;
        struct sd_inode *inode = NULL;
        uint64_t offset = 0, oid, old_oid, done = 0, total = (uint64_t) -1;
        unsigned int len;
@@ -1306,7 +1324,7 @@ static int vdi_write(int argc, char **argv)
                flags = 0;
                len = min(total - done, SD_DATA_OBJ_SIZE - offset);
 
-               vdi_id = sd_inode_get_vid(inode, idx);
+               vdi_id = INODE_GET_VID(inode, idx);
                if (!vdi_id)
                        create = true;
                else if (!is_data_obj_writeable(inode, idx)) {
@@ -1328,7 +1346,7 @@ static int vdi_write(int argc, char **argv)
                        total = done + len;
                }
 
-               sd_inode_set_vid(inode, idx, inode->vdi_id);
+               INODE_SET_VID(inode, idx, inode->vdi_id);
                oid = vid_to_data_oid(inode->vdi_id, idx);
                ret = sd_write_object(oid, old_oid, buf, len, offset, flags,
                                      inode->nr_copies, inode->copy_policy,
@@ -1728,7 +1746,7 @@ static void queue_vdi_check_work(const struct sd_inode 
*inode, uint64_t oid,
 
 int do_vdi_check(const struct sd_inode *inode)
 {
-       int max_idx;
+       uint32_t max_idx;
        uint64_t done = 0, oid;
        uint32_t vid;
        struct work_queue *wq;
@@ -1746,8 +1764,8 @@ int do_vdi_check(const struct sd_inode *inode)
 
        max_idx = count_data_objs(inode);
        vdi_show_progress(done, inode->vdi_size);
-       for (int idx = 0; idx < max_idx; idx++) {
-               vid = sd_inode_get_vid(inode, idx);
+       for (uint32_t idx = 0; idx < max_idx; idx++) {
+               vid = INODE_GET_VID(inode, idx);
                if (vid) {
                        oid = vid_to_data_oid(vid, idx);
                        queue_vdi_check_work(inode, oid, &done, wq);
@@ -1822,7 +1840,7 @@ static void compact_obj_backup(struct obj_backup *backup, 
uint8_t *from_data)
        }
 }
 
-static int get_obj_backup(int idx, uint32_t from_vid, uint32_t to_vid,
+static int get_obj_backup(uint32_t idx, uint32_t from_vid, uint32_t to_vid,
                          struct obj_backup *backup)
 {
        int ret;
@@ -1863,7 +1881,8 @@ static int get_obj_backup(int idx, uint32_t from_vid, 
uint32_t to_vid,
 static int vdi_backup(int argc, char **argv)
 {
        const char *vdiname = argv[optind++];
-       int ret = EXIT_SUCCESS, idx, nr_objs;
+       int ret = EXIT_SUCCESS;
+       uint32_t idx, nr_objs;
        struct sd_inode *from_inode = xzalloc(sizeof(*from_inode));
        struct sd_inode *to_inode = xzalloc(sizeof(*to_inode));
        struct backup_hdr hdr = {
@@ -1902,8 +1921,8 @@ static int vdi_backup(int argc, char **argv)
        }
 
        for (idx = 0; idx < nr_objs; idx++) {
-               uint32_t from_vid = sd_inode_get_vid(from_inode, idx);
-               uint32_t to_vid = sd_inode_get_vid(to_inode, idx);
+               uint32_t from_vid = INODE_GET_VID(from_inode, idx);
+               uint32_t to_vid = INODE_GET_VID(to_inode, idx);
 
                if (to_vid == 0 && from_vid == 0)
                        continue;
@@ -1956,7 +1975,7 @@ static int restore_obj(struct obj_backup *backup, 
uint32_t vid,
                       struct sd_inode *parent_inode)
 {
        int ret;
-       uint32_t parent_vid = sd_inode_get_vid(parent_inode, backup->idx);
+       uint32_t parent_vid = INODE_GET_VID(parent_inode, backup->idx);
        uint64_t parent_oid = 0;
 
        if (parent_vid)
diff --git a/include/sheepdog_proto.h b/include/sheepdog_proto.h
index 36d5701..acae97d 100644
--- a/include/sheepdog_proto.h
+++ b/include/sheepdog_proto.h
@@ -74,6 +74,9 @@
 #define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
 #define SD_RES_HALT          0x19 /* Sheepdog is stopped doing IO */
 #define SD_RES_READONLY      0x1A /* Object is read-only */
+#define SD_RES_BTREE_NOT_FOUND 0x1B /* Cannot found node in btree */
+#define SD_RES_BTREE_FOUND   0x1C /* Found node in btree */
+#define SD_RES_BTREE_REPEAT  0x1D /* Should repeat op in btree */
 
 /* errors above 0x80 are sheepdog-internal */
 
@@ -92,6 +95,7 @@
 #define VDI_BIT (UINT64_C(1) << 63)
 #define VMSTATE_BIT (UINT64_C(1) << 62)
 #define VDI_ATTR_BIT (UINT64_C(1) << 61)
+#define VDI_BTREE_BIT (UINT64_C(1) << 60)
 #define MAX_DATA_OBJS (1ULL << 20)
 #define MAX_CHILDREN 1024U
 #define SD_MAX_VDI_LEN 256U
@@ -104,8 +108,9 @@
 #define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 
 #define SD_INODE_SIZE (sizeof(struct sd_inode))
-#define SD_INODE_HEADER_SIZE (sizeof(struct sd_inode) - \
-                             sizeof(uint32_t) * MAX_DATA_OBJS)
+#define SD_INODE_INDEX_SIZE (sizeof(uint32_t) * MAX_DATA_OBJS)
+#define SD_INODE_HEADER_SIZE ((unsigned long) \
+                             (&((struct sd_inode *)0)->data_vdi_id))
 #define SD_ATTR_OBJ_SIZE (sizeof(struct sheepdog_vdi_attr))
 #define CURRENT_VDI_ID 0
 
@@ -215,7 +220,7 @@ struct sd_inode {
        uint64_t vdi_size;
        uint64_t vm_state_size;
        uint8_t  copy_policy;
-       uint8_t  reserved;
+       uint8_t  store_policy;
        uint8_t  nr_copies;
        uint8_t  block_size_shift;
        uint32_t snap_id;
@@ -223,8 +228,31 @@ struct sd_inode {
        uint32_t parent_vdi_id;
        uint32_t child_vdi_id[MAX_CHILDREN];
        uint32_t data_vdi_id[MAX_DATA_OBJS];
+       uint32_t btree_counter;
 };
 
+struct sd_extent {
+       uint32_t idx;
+       uint32_t vdi_id;
+};
+
+struct sd_extent_idx {
+       uint32_t idx;
+       uint64_t oid;
+};
+
+#define INODE_BTREE_MAGIC      0x6274
+
+struct sd_extent_header {
+       uint16_t magic;
+       uint16_t depth; /* 1 -- ext node; 2 -- idx node */
+       uint32_t entries;
+};
+
+typedef int (*write_node_fn)(uint64_t id, void *mem, unsigned int len,
+                               int copies, int copy_policy, int create);
+typedef int (*read_node_fn)(uint64_t id, void **mem, unsigned int len);
+
 struct sheepdog_vdi_attr {
        char name[SD_MAX_VDI_LEN];
        char tag[SD_MAX_VDI_TAG_LEN];
@@ -235,9 +263,12 @@ struct sheepdog_vdi_attr {
        char value[SD_MAX_VDI_ATTR_VALUE_LEN];
 };
 
-uint32_t sd_inode_get_vid(const struct sd_inode *inode, int idx);
-void sd_inode_set_vid(struct sd_inode *inode, int idx, uint32_t vdi_id);
-void sd_inode_copy_vids(struct sd_inode *oldi, struct sd_inode *newi);
+extern uint32_t sd_inode_get_vid(read_node_fn reader,
+                                const struct sd_inode *inode, uint32_t idx);
+extern void sd_inode_set_vid(write_node_fn writer, read_node_fn reader,
+                            struct sd_inode *inode, uint32_t idx,
+                            uint32_t vdi_id);
+extern void sd_inode_copy_vdis(struct sd_inode *oldi, struct sd_inode *newi);
 
 /* 64 bit FNV-1a non-zero initial basis */
 #define FNV1A_64_INIT ((uint64_t) 0xcbf29ce484222325ULL)
@@ -326,12 +357,6 @@ static inline uint64_t hash_64(uint64_t val, unsigned int 
bits)
        return sd_hash_64(val) >> (64 - bits);
 }
 
-static inline bool is_data_obj_writeable(const struct sd_inode *inode,
-                                        int idx)
-{
-       return inode->vdi_id == sd_inode_get_vid(inode, idx);
-}
-
 static inline bool is_vdi_obj(uint64_t oid)
 {
        return !!(oid & VDI_BIT);
@@ -347,10 +372,15 @@ static inline bool is_vdi_attr_obj(uint64_t oid)
        return !!(oid & VDI_ATTR_BIT);
 }
 
+static inline bool is_vdi_btree_obj(uint64_t oid)
+{
+       return !!(oid & VDI_BTREE_BIT);
+}
+
 static inline bool is_data_obj(uint64_t oid)
 {
        return !is_vdi_obj(oid) && !is_vmstate_obj(oid) &&
-               !is_vdi_attr_obj(oid);
+               !is_vdi_attr_obj(oid) && !is_vdi_btree_obj(oid);
 }
 
 static inline size_t count_data_objs(const struct sd_inode *inode)
@@ -394,6 +424,11 @@ static inline uint64_t vid_to_attr_oid(uint32_t vid, 
uint32_t attrid)
        return ((uint64_t)vid << VDI_SPACE_SHIFT) | VDI_ATTR_BIT | attrid;
 }
 
+static inline uint64_t vid_to_btree_oid(uint32_t vid, uint32_t btreeid)
+{
+       return ((uint64_t)vid << VDI_SPACE_SHIFT) | VDI_BTREE_BIT | btreeid;
+}
+
 static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 {
        return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
diff --git a/lib/sd_inode.c b/lib/sd_inode.c
index 9d1926f..d3b4251 100644
--- a/lib/sd_inode.c
+++ b/lib/sd_inode.c
@@ -1,13 +1,572 @@
+/*
+ * B-tree is a tree data structure that keeps data sorted and allows searches,
+ * sequential access, insertions, and deletions in logarithmic time.
+ * The B-tree is a generalization of a binary search tree in that a node can
+ * have more than two children. (Comer 1979, p. 123) Unlike self-balancing
+ * binary search trees, the B-tree is optimized for systems that read and
+ * write large blocks of data. (ref: http://en.wikipedia.org/wiki/B-tree)
+ *
+ * In sheepdog, we use space in inode->data_vdi_id[] to store leaf-node at
+ * beginning and store root-node of B-tree when it reach depths of two.
+ *
+ * At beginning, the inode->data_vdi_id[] is storing leaf-node which point
+ * to data-obj directly:
+ *
+ *     +------------------+-----------+-----------+--------+
+ *     | sd_extent_header | sd_extent | sd_extent | ...... |
+ *     +------------------+-----------+-----------+--------+
+ *                              |          |
+ *                             /            \
+ *                            /              \
+ *                           /                \
+ *     +------------+ <------                  ----> +------------+
+ *     | data-obj 1 |                                | data-obj 2 |
+ *     +------------+                                +------------+
+ *
+ * After adding more oid into it, the leaf-node will be full of struct 
sd_extent
+ * and should be splited to two leaf-nodes, after it, the inode->data_vdi_id[]
+ * should become root-node which store sd_extent_idx and point to the two
+ * leaf-nodes:
+ *
+ *     +------------------+-----------------+-----------------+
+ *     | sd_extent_header |  sd_extent_idx  |  sd_extent_idx  |
+ *     +------------------+-----------------+-----------------+
+ *                              |                   |
+ *                             /                    \
+ *                            /                      -------------
+ *                           /                                    \
+ *                          /                                      \
+ *                         /                                        \
+ *     +------------------+-----------+-----------+--------+      
+------------------+-----------+-----------+--------+
+ *     | sd_extent_header | sd_extent | sd_extent | ...... |      | 
sd_extent_header | sd_extent | sd_extent | ...... |
+ *     +------------------+-----------+-----------+--------+      
+------------------+-----------+-----------+--------+
+ *                           /                \                                
             /           \
+ *     +------------+ <------                  ---> +------------+      
+--------------+ <--             --> +--------------+
+ *     | data-obj 1 |                               | data-obj 2 |      | 
data-obj 511 |                     | data-obj 512 |
+ *     +------------+                               +------------+      
+--------------+                     +--------------+
+ *
+ * When a leaf-node is full, we could add a new leaf-node and add a
+ * new sd_extent_idx in root-node to point to it:
+ *
+ *     +------------------+-----------------+-----------------+---------------+
+ *     | sd_extent_header |  sd_extent_idx  |  sd_extent_idx  | sd_extent_idx |
+ *     +------------------+-----------------+-----------------+---------------+
+ *                              |                   |                 \
+ *                             /                    \                  \     
(new leaf-node)
+ *                            /                      ---------          ------ 
+------------------+-----------+--------+
+ *                           /                                \                
| sd_extent_header | sd_extent | ...... |
+ *                          /                                  \               
+------------------+-----------+--------+
+ *                         /                                    \
+ *     +------------------+-----------+--------+      
+------------------+-----------+--------+
+ *     | sd_extent_header | sd_extent | ...... |      | sd_extent_header | 
sd_extent | ...... |
+ *     +------------------+-----------+--------+      
+------------------+-----------+--------+
+ *
+ *
+ * As above, the root-node point to leaf-node which point to data-obj
+ * (the implemention of B-tree in sd_inode only support two depth), so it could
+ * store:
+ *
+ *   (number of sd_extent_idx in root-node) * (number of sd_extent in 
leaf-node)
+ *
+ * which is 349524 * 524287 = 183250889388 data-objects (about 680 PB with 4MB 
data-objs).
+ *
+ */
 #include <string.h>
 
+#include "util.h"
 #include "sheepdog_proto.h"
 
-uint32_t sd_inode_get_vid(const struct sd_inode *inode, int idx)
+#define EXT_MAX_SPACE (SD_INODE_INDEX_SIZE - sizeof(struct sd_extent_header))
+#define EXT_MAX_ENTRIES (EXT_MAX_SPACE / sizeof(struct sd_extent))
+#define EXT_IDX_MAX_ENTRIES (EXT_MAX_SPACE / sizeof(struct sd_extent_idx))
+
+#define EXT_HEADER(data) ((struct sd_extent_header *)(data))
+#define FIRST_EXT(data)  ((struct sd_extent *)((char *)(data) + \
+                       sizeof(struct sd_extent_header)))
+#define LAST_EXT(data)   (FIRST_EXT(data) + EXT_HEADER(data)->entries)
+#define OFFSET_EXT(data, n) ((char *)(data) + sizeof(struct sd_extent_header) \
+                       + n * sizeof(struct sd_extent))
+
+#define EXT_MAX_IDXS (EXT_MAX_SPACE / sizeof(struct sd_extent_idx))
+#define FIRST_IDX(data)  ((struct sd_extent_idx *)((char *)(data) + \
+                       sizeof(struct sd_extent_header)))
+#define LAST_IDX(data)   (FIRST_IDX(data) + EXT_HEADER(data)->entries)
+#define OFFSET_IDX(data, n) ((char *)(data) + sizeof(struct sd_extent_header) \
+                       + n * sizeof(struct sd_extent_idx))
+
+struct find_path {
+       struct sd_extent_idx *p_idx;
+       struct sd_extent *p_ext;
+       struct sd_extent_header *p_ext_header;
+       int depth;
+};
+
+typedef int (*comp)(void *a, void *b);
+
+/* compare function for sd_extent */
+static int extent_comp(void *a, void *b)
+{
+       struct sd_extent *ea = (struct sd_extent *)a;
+       struct sd_extent *eb = (struct sd_extent *)b;
+
+       if (ea->idx > eb->idx)
+               return 1;
+       else if (ea->idx < eb->idx)
+               return -1;
+       else
+               return 0;
+}
+
+/* compare function for sd_extent_idx */
+static int index_comp(void *a, void *b)
+{
+       struct sd_extent_idx *ia = (struct sd_extent_idx *)a;
+       struct sd_extent_idx *ib = (struct sd_extent_idx *)b;
+
+       if (ia->idx > ib->idx)
+               return 1;
+       else if (ia->idx < ib->idx)
+               return -1;
+       else
+               return 0;
+}
+
+/* dump the information of B-tree */
+static void dump_btree(read_node_fn reader, struct sd_inode *inode)
+{
+#ifdef DEBUG
+       struct sd_extent_header *header = EXT_HEADER(inode->data_vdi_id);
+       struct sd_extent_header *leaf_node = NULL;
+       struct sd_extent *last, *iter;
+       struct sd_extent_idx *last_idx, *iter_idx;
+       void *tmp;
+
+       sd_info("btree> header: %u %u %u", header->magic,
+                       header->entries, header->depth);
+
+       if (header->depth == 1) {
+               last = LAST_EXT(inode->data_vdi_id);
+               iter = FIRST_EXT(inode->data_vdi_id);
+
+               while (iter != last) {
+                       sd_info("btree> ext: %d, %u", iter->idx, iter->vdi_id);
+                       iter++;
+               }
+       } else if (header->depth == 2) {
+               last_idx = LAST_IDX(inode->data_vdi_id);
+               iter_idx = FIRST_IDX(inode->data_vdi_id);
+               leaf_node = xmalloc(SD_INODE_INDEX_SIZE);
+               tmp = (void *)leaf_node;
+
+               while (iter_idx != last_idx) {
+                       reader(iter_idx->oid, &tmp, SD_INODE_INDEX_SIZE);
+
+                       sd_info("btree> %p idx: %d, %lu, %u",
+                                       iter_idx, iter_idx->idx, iter_idx->oid,
+                                       leaf_node->entries);
+                       last = LAST_EXT(leaf_node);
+                       iter = FIRST_EXT(leaf_node);
+                       while (iter != last) {
+                               sd_info("btree> ext in: %d, %u",
+                                               iter->idx, iter->vdi_id);
+                               iter++;
+                       }
+                       iter_idx++;
+               }
+
+               free(leaf_node);
+       } else
+               panic("This B-tree not support depth %u", header->depth);
+#endif
+}
+
+/*
+ * Search for the key in a B-tree node. If can't find it, return the position
+ * for insert operation. So we can't just use xbsearch().
+ */
+static void *binary_search(void *first, void *last, void *key,
+                          size_t obj_size, comp cmp)
 {
-       return inode->data_vdi_id[idx];
+       const char *l, *r, *m;
+       int ret;
+
+       l = (const char *)first;
+       r = (const char *)last - obj_size;
+       while (l <= r) {
+               m = l + ((r - l) / obj_size / 2) * obj_size;
+               ret = cmp((void *)key, (void *)m);
+               if (ret < 0)
+                       r = m - obj_size;
+               else if (ret > 0)
+                       l = m + obj_size;
+               else
+                       return (void *)m;
+       }
+       return (void *)l;
+}
+
+static void sd_inode_init(void *data, int depth)
+{
+       struct sd_extent_header *header = EXT_HEADER(data);
+       header->magic = INODE_BTREE_MAGIC;
+       header->depth = depth;
+       header->entries = 0;
+}
+
+/* check whether ext is in this node */
+static bool ext_in_range(struct sd_extent_header *header, struct sd_extent 
*ext)
+{
+       struct sd_extent *last = LAST_EXT(header);
+       if (last - ext > 0)
+               return true;
+       return false;
+}
+
+/* check whether idx is in this node */
+static bool idx_in_range(struct sd_extent_header *header,
+                               struct sd_extent_idx *idx)
+{
+       struct sd_extent_idx *last = LAST_IDX(header);
+       if (last - idx > 0)
+               return true;
+       return false;
+}
+
+/* search idx in leaf-node */
+static struct sd_extent *search_ext_entry(struct sd_extent_header *header,
+                                         uint32_t idx)
+{
+       struct sd_extent tmp;
+       tmp.idx = idx;
+       return binary_search(FIRST_EXT(header), LAST_EXT(header), &tmp,
+                       sizeof(struct sd_extent), extent_comp);
+}
+
+/* search idx in middle-node */
+static struct sd_extent_idx *search_idx_entry(struct sd_extent_header *header,
+                                             uint32_t idx)
+{
+       struct sd_extent_idx tmp;
+       tmp.idx = idx;
+       return binary_search(FIRST_IDX(header), LAST_IDX(header), &tmp,
+                       sizeof(struct sd_extent_idx), index_comp);
+}
+
+static void insert_ext_entry_nosearch(struct sd_extent_header *header,
+                                     struct sd_extent *ext, uint32_t idx,
+                                     uint32_t vdi_id)
+{
+       struct sd_extent *last = LAST_EXT(header);
+
+       memmove(ext + 1, ext, (last - ext) * sizeof(struct sd_extent));
+       ext->idx = idx;
+       ext->vdi_id = vdi_id;
+       header->entries++;
+}
+
+static void insert_idx_entry_nosearch(struct sd_extent_header *header,
+                                     struct sd_extent_idx *idx_ext,
+                                     uint32_t idx, uint64_t oid)
+{
+       struct sd_extent_idx *last = LAST_IDX(header);
+       memmove(idx_ext + 1, idx_ext,
+                       (last - idx_ext) * sizeof(struct sd_extent_idx));
+       idx_ext->idx = idx;
+       idx_ext->oid = oid;
+       header->entries++;
+}
+
+static void insert_idx_entry(struct sd_extent_header *header,
+                            uint32_t idx, uint64_t oid)
+{
+       struct sd_extent_idx *found;
+
+       if (header->entries >= EXT_MAX_IDXS)
+               goto out;
+
+       if (!header->entries) {
+               FIRST_IDX(header)->idx = idx;
+               FIRST_IDX(header)->oid = oid;
+               header->entries++;
+               goto out;
+       }
+
+       found = search_idx_entry(header, idx);
+       insert_idx_entry_nosearch(header, found, idx, oid);
+out:
+       return;
+}
+
+static void split_to_nodes(struct sd_extent_header *src,
+                          struct sd_extent_header *left,
+                          struct sd_extent_header *right, int num)
+{
+       memcpy(left, src, sizeof(struct sd_extent_header) +
+                       num * sizeof(struct sd_extent));
+       left->entries = num;
+
+       mempcpy(right, src, sizeof(struct sd_extent_header));
+       mempcpy(FIRST_EXT(right), OFFSET_EXT(src, num),
+                       (src->entries - num) * sizeof(struct sd_extent));
+       right->entries = src->entries - num;
+}
+
+/*
+ * The meta-data in inode is leaf-node at beginning, but after inserting too
+ * much sd_extent it will be full. When sd_extents is full, we need to create
+ * two new nodes, move sd_extents from inode to them and finally, let inode
+ * point to them.
+ */
+static void transfer_to_idx_root(write_node_fn writer, struct sd_inode *inode)
+{
+       struct sd_extent_header *left;
+       struct sd_extent_header *right;
+       struct sd_extent_header *root = EXT_HEADER(inode->data_vdi_id);
+       uint64_t left_oid, right_oid;
+       uint32_t num = root->entries / 2;
+
+       /* create two leaf-node and copy the entries from root-node */
+       left = xmalloc(SD_INODE_INDEX_SIZE);
+       right = xmalloc(SD_INODE_INDEX_SIZE);
+
+       split_to_nodes(root, left, right, num);
+
+       /* write two nodes back */
+       left_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++);
+       right_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++);
+
+       writer(left_oid, left, SD_INODE_INDEX_SIZE, inode->nr_copies,
+                       inode->copy_policy, 1);
+       writer(right_oid, right, SD_INODE_INDEX_SIZE, inode->nr_copies,
+                       inode->copy_policy, 1);
+
+       /* change root from ext-node to idx-node */
+       root->entries = 0;
+       root->depth = 2;
+       insert_idx_entry(root, (LAST_EXT(left) - 1)->idx, left_oid);
+       insert_idx_entry(root, (LAST_EXT(right) - 1)->idx, right_oid);
+
+       free(left);
+       free(right);
+}
+
+/*
+ * Search whole btree for 'idx'.
+ * Return available position (could insert new sd_extent) if can't find 'idx'.
+ */
+static int search_whole_btree(read_node_fn reader, const struct sd_inode 
*inode,
+                             uint32_t idx, struct find_path *path)
+{
+       struct sd_extent_header *header, *leaf_node;
+       void *tmp;
+       uint64_t oid;
+       int ret = SD_RES_BTREE_NOT_FOUND;
+
+       header = EXT_HEADER(inode->data_vdi_id);
+
+       /* root is idx-node */
+       if (header->depth == 2) {
+               path->depth = 2;
+               path->p_idx = search_idx_entry(header, idx);
+               leaf_node = xmalloc(SD_INODE_INDEX_SIZE);
+               tmp = (void *)leaf_node;
+
+               if (idx_in_range(header, path->p_idx)) {
+                       oid = path->p_idx->oid;
+                       ret = reader(oid, &tmp, SD_INODE_INDEX_SIZE);
+                       if (ret != SD_RES_SUCCESS)
+                               goto out;
+                       path->p_ext = search_ext_entry(leaf_node, idx);
+                       path->p_ext_header = leaf_node;
+                       if (ext_in_range(leaf_node, path->p_ext) &&
+                                       path->p_ext->idx == idx)
+                               ret = SD_RES_BTREE_FOUND;
+               } else {
+                       /* check if last idx-node has space */
+                       oid = (path->p_idx - 1)->oid;
+                       ret = reader(oid, &tmp, SD_INODE_INDEX_SIZE);
+                       if (ret != SD_RES_SUCCESS)
+                               goto out;
+                       if (leaf_node->entries < EXT_MAX_ENTRIES) {
+                               path->p_ext = search_ext_entry(leaf_node, idx);
+                               path->p_ext_header = leaf_node;
+                       }
+               }
+       } else if (header->depth == 1) {
+               path->depth = 1;
+               path->p_ext = search_ext_entry(header, idx);
+               if (ext_in_range(header, path->p_ext) &&
+                               path->p_ext->idx == idx)
+                       ret = SD_RES_BTREE_FOUND;
+               else
+                       ret = SD_RES_BTREE_NOT_FOUND;
+       }
+out:
+       return ret;
+}
+
+uint32_t sd_inode_get_vid(read_node_fn reader, const struct sd_inode *inode,
+                         uint32_t idx)
+{
+       struct find_path path;
+       int ret;
+
+       if (inode->store_policy == 0)
+               return inode->data_vdi_id[idx];
+       else {
+               /* btree is not init, so vdi is 0 */
+               if (inode->data_vdi_id[0] == 0)
+                       return 0;
+
+               memset(&path, 0, sizeof(path));
+               ret = search_whole_btree(reader, inode, idx, &path);
+               if (ret == SD_RES_BTREE_FOUND)
+                       return path.p_ext->vdi_id;
+               if (path.p_ext_header)
+                       free(path.p_ext_header);
+       }
+
+       return 0;
+}
+
+/*
+ * When the leaf-node is full, we need to create a new node and
+ * move half of the data into new one.
+ */
+static void split_ext_node(write_node_fn writer, struct sd_inode *inode,
+                          struct find_path *path)
+{
+       struct sd_extent_header *old = path->p_ext_header, *new_ext;
+       uint32_t num = old->entries / 2;
+       uint64_t new_oid;
+
+       new_ext = xmalloc(SD_INODE_INDEX_SIZE);
+
+       split_to_nodes(old, new_ext, old, num);
+
+       new_oid = vid_to_btree_oid(inode->vdi_id, inode->btree_counter++);
+       writer(new_oid, new_ext, SD_INODE_INDEX_SIZE, inode->nr_copies,
+                       inode->copy_policy, 1);
+       writer(path->p_idx->oid, old, SD_INODE_INDEX_SIZE, inode->nr_copies,
+                       inode->copy_policy, 0);
+
+       /* write new index */
+       insert_idx_entry(EXT_HEADER(inode->data_vdi_id),
+                       LAST_EXT(new_ext)->idx, new_oid);
+
+       free(new_ext);
+}
+
+/*
+ * Add new 'idx' and 'vdi_id' pair into leaf-node if depth equal 1 and
+ * add new leaf-node if there is no room for new 'idx' and 'vdi_id' pair.
+ */
+static int insert_new_node(write_node_fn writer, read_node_fn reader,
+                          struct sd_inode *inode, struct find_path *path,
+                          uint32_t idx, uint32_t vdi_id)
+{
+       struct sd_extent_header *header = EXT_HEADER(inode->data_vdi_id);
+       struct sd_extent_header *leaf_node = NULL;
+       uint64_t oid;
+       int ret = SD_RES_SUCCESS;
+
+       if (path->depth == 1) {
+               if (header->entries >= EXT_MAX_ENTRIES) {
+                       transfer_to_idx_root(writer, inode);
+                       ret = SD_RES_BTREE_REPEAT;
+                       goto out;
+               }
+               insert_ext_entry_nosearch(header,
+                               path->p_ext, idx, vdi_id);
+       } else if (path->depth == 2) {
+               if (idx_in_range(header, path->p_idx)) {
+                       if (!path->p_ext_header) {
+                               ret = SD_RES_BTREE_NOT_FOUND;
+                               goto out;
+                       }
+                       if (path->p_ext_header->entries >= EXT_MAX_ENTRIES) {
+                               split_ext_node(writer, inode, path);
+                               ret = SD_RES_BTREE_REPEAT;
+                               goto out;
+                       }
+                       insert_ext_entry_nosearch(path->p_ext_header,
+                                       path->p_ext, idx, vdi_id);
+                       writer(path->p_idx->oid, path->p_ext_header,
+                               SD_INODE_INDEX_SIZE, inode->nr_copies,
+                               inode->copy_policy, 1);
+               } else if (path->p_ext_header) {
+                       /* the last idx-node */
+                       insert_ext_entry_nosearch(path->p_ext_header,
+                                       path->p_ext, idx, vdi_id);
+                       path->p_idx--;
+                       path->p_idx->idx =
+                               (LAST_EXT(path->p_ext_header) - 1)->idx;
+                       writer(path->p_idx->oid, path->p_ext_header,
+                               SD_INODE_INDEX_SIZE, inode->nr_copies,
+                               inode->copy_policy, 1);
+               } else {
+                       /* create a new ext-node */
+                       leaf_node = xmalloc(SD_INODE_INDEX_SIZE);
+                       sd_inode_init(leaf_node, 2);
+                       oid = vid_to_btree_oid(inode->vdi_id,
+                                       inode->btree_counter++);
+                       insert_ext_entry_nosearch(leaf_node,
+                                       FIRST_EXT(leaf_node), idx, vdi_id);
+                       writer(oid, leaf_node, SD_INODE_INDEX_SIZE,
+                                       inode->nr_copies,
+                                       inode->copy_policy, 1);
+                       insert_idx_entry_nosearch(header, path->p_idx,
+                                       idx, oid);
+               }
+       }
+out:
+       if (leaf_node)
+               free(leaf_node);
+       return ret;
+}
+
+void sd_inode_set_vid(write_node_fn writer, read_node_fn reader,
+                     struct sd_inode *inode, uint32_t idx, uint32_t vdi_id)
+{
+       struct sd_extent_header *header;
+       struct find_path path;
+       int ret;
+
+       path.p_ext_header = NULL;
+
+       if (inode->store_policy == 0)
+               inode->data_vdi_id[idx] = vdi_id;
+       else {
+               if (inode->data_vdi_id[0] == 0)
+                       sd_inode_init(inode->data_vdi_id, 1);
+               header = EXT_HEADER(inode->data_vdi_id);
+               if (header->magic != INODE_BTREE_MAGIC)
+                       panic("%s() B-tree in inode is corrupt!", __func__);
+               while (1) {
+                       memset(&path, 0, sizeof(path));
+                       ret = search_whole_btree(reader, inode, idx, &path);
+                       if (ret == SD_RES_BTREE_FOUND) {
+                               path.p_ext->vdi_id = vdi_id;
+                               goto out;
+                       } else {
+                               ret = insert_new_node(writer, reader, inode,
+                                               &path, idx, vdi_id);
+                               if (SD_RES_BTREE_REPEAT == ret) {
+                                       if (path.p_ext_header)
+                                               free(path.p_ext_header);
+                                       continue;
+                               } else
+                                       goto out;
+                       }
+               }
+       }
+out:
+       if (path.p_ext_header)
+               free(path.p_ext_header);
+       dump_btree(reader, inode);
 }
 
-void sd_inode_set_vid(struct sd_inode *inode, int idx, uint32_t vdi_id)
+void sd_inode_copy_vdis(struct sd_inode *oldi, struct sd_inode *newi)
 {
-       inode->data_vdi_id[idx] = vdi_id;
+       memcpy(newi->data_vdi_id, oldi->data_vdi_id, sizeof(newi->data_vdi_id));
 }
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 83b461c..e402fcd 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -376,6 +376,15 @@ void objlist_cache_remove(uint64_t oid);
 
 void put_request(struct request *req);
 
+int sheep_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                      int copies, int copy_policy, int create);
+int sheep_bnode_reader(uint64_t oid, void **mem, unsigned int len);
+
+#define INODE_GET_VID(inode, idx) (sd_inode_get_vid(sheep_bnode_reader, \
+                                       inode, idx))
+#define INODE_SET_VID(inode, idx, vdi_id) 
(sd_inode_set_vid(sheep_bnode_writer,\
+                                       sheep_bnode_reader, inode, idx, vdi_id))
+
 /* Operations */
 
 const struct sd_op_template *get_sd_op(uint8_t opcode);
diff --git a/sheep/vdi.c b/sheep/vdi.c
index f493dee..2403bcc 100644
--- a/sheep/vdi.c
+++ b/sheep/vdi.c
@@ -29,6 +29,17 @@ static struct sd_lock vdi_state_lock = SD_LOCK_INITIALIZER;
  */
 int ec_max_data_strip;
 
+int sheep_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                      int copies, int copy_policy, int create)
+{
+       return write_object(oid, mem, len, 0, create == 1);
+}
+
+int sheep_bnode_reader(uint64_t oid, void **mem, unsigned int len)
+{
+       return read_object(oid, *mem, len, 0);
+}
+
 static int vdi_state_cmp(const struct vdi_state_entry *a,
                         const struct vdi_state_entry *b)
 {
@@ -847,7 +858,7 @@ static void delete_one(struct work *work)
        nr_objs = count_data_objs(inode);
        for (nr_deleted = 0, i = 0; i < nr_objs; i++) {
                uint64_t oid;
-               uint32_t vid = sd_inode_get_vid(inode, i);
+               uint32_t vid = INODE_GET_VID(inode, i);
 
                if (!vid)
                        continue;
diff --git a/sheepfs/volume.c b/sheepfs/volume.c
index 29abb62..000d533 100644
--- a/sheepfs/volume.c
+++ b/sheepfs/volume.c
@@ -64,6 +64,24 @@ struct vdi_inode {
 static struct rb_root vdi_inode_tree = RB_ROOT;
 static struct sd_lock vdi_inode_tree_lock = SD_LOCK_INITIALIZER;
 
+
+static int sheepfs_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                               int copies, int copy_policy, int create);
+static int sheepfs_bnode_reader(uint64_t oid, void **mem, unsigned int len);
+
+#define INODE_GET_VID(inode, idx) (sd_inode_get_vid(\
+                                       sheepfs_bnode_reader, inode, idx))
+#define INODE_SET_VID(inode, idx, vdi_id) (sd_inode_set_vid( \
+                                       sheepfs_bnode_writer, \
+                                       sheepfs_bnode_reader, \
+                                       inode, idx, vdi_id))
+
+static inline bool is_data_obj_writeable(const struct sd_inode *inode,
+                                        uint32_t idx)
+{
+       return inode->vdi_id == INODE_GET_VID(inode, idx);
+}
+
 static int vdi_inode_cmp(const struct vdi_inode *a, const struct vdi_inode *b)
 {
        return intcmp(a->vid, b->vid);
@@ -129,7 +147,7 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t 
size,
        if (is_data_obj(oid)) {
                idx = data_oid_to_idx(oid);
                assert(vdi);
-               vdi_id = sd_inode_get_vid(vdi->inode, idx);
+               vdi_id = INODE_GET_VID(vdi->inode, idx);
                if (!vdi_id) {
                        /* if object doesn't exist, we'er done */
                        if (rw == VOLUME_READ) {
@@ -152,7 +170,7 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t 
size,
        if (rw == VOLUME_READ)
                hdr.opcode = SD_OP_READ_OBJ;
        else {
-               hdr.opcode = create ?
+               hdr.opcode = (create || is_vdi_btree_obj(oid)) ?
                        SD_OP_CREATE_AND_WRITE_OBJ : SD_OP_WRITE_OBJ;
                hdr.flags |= SD_FLAG_CMD_WRITE;
        }
@@ -176,7 +194,7 @@ static int volume_rw_object(char *buf, uint64_t oid, size_t 
size,
        }
 
        if (create) {
-               sd_inode_set_vid(vdi->inode, idx, vid);
+               INODE_SET_VID(vdi->inode, idx, vid);
                /* writeback inode update */
                if (volume_rw_object((char *)&vid, vid_to_vdi_oid(vid),
                                     sizeof(vid),
@@ -231,6 +249,25 @@ static int volume_do_rw(const char *path, char *buf, 
size_t size,
        return 0;
 }
 
+int sheepfs_bnode_writer(uint64_t oid, void *mem, unsigned int len,
+                        int copies, int copy_policy, int create)
+{
+       int ret;
+       ret = volume_rw_object(mem, oid, len, 0, VOLUME_WRITE);
+       if (ret == len)
+               return SD_RES_SUCCESS;
+       return ret;
+}
+
+int sheepfs_bnode_reader(uint64_t oid, void **mem, unsigned int len)
+{
+       int ret;
+       ret = volume_rw_object(*mem, oid, len, 0, VOLUME_READ);
+       if (ret == len)
+               return SD_RES_SUCCESS;
+       return ret;
+}
+
 int volume_read(const char *path, char *buf, size_t size, off_t offset)
 {
 
-- 
1.7.1

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to