From: Liu Yuan <[email protected]>

Object cache writethrough mode provide us a read-only cache which is alwasy
consistent with backend store.

We can set the object cache mode by 'w' option as following:

 sheep -w cache_size{,writethrough | writeback}

For e.g, we can set 1G size writethrough cache:
 $ sheep -w 1000,writethrough
 $ sheep -w 1000

writethrough mode is default object cache

to set as writeback cache:
 $ sheep -w 1000,writeback

Signed-off-by: Liu Yuan <[email protected]>
---
 sheep/object_cache.c |  125 +++++++++++++++++++++++++++++++++++++-------------
 sheep/sheep.c        |   22 ++++++---
 sheep/sheep_priv.h   |    1 +
 3 files changed, 109 insertions(+), 39 deletions(-)

diff --git a/sheep/object_cache.c b/sheep/object_cache.c
index bb14fb8..6b102d4 100644
--- a/sheep/object_cache.c
+++ b/sheep/object_cache.c
@@ -53,6 +53,18 @@ struct global_cache {
        struct cds_list_head cache_lru_list;
 };
 
+struct object_cache_entry {
+       uint32_t idx;
+       int refcnt;
+       uint64_t bmap; /* each bit represents one dirty block in object */
+       struct object_cache *oc;
+       struct rb_node node;
+       struct rb_node dirty_node;
+       struct list_head dirty_list;
+       struct list_head object_list;
+       struct cds_list_head lru_list;
+};
+
 struct object_cache {
        uint32_t vid;
        struct hlist_node hash;
@@ -63,18 +75,9 @@ struct object_cache {
        struct rb_root object_tree;
 
        pthread_rwlock_t lock;
-};
 
-struct object_cache_entry {
-       uint32_t idx;
-       int refcnt;
-       uint64_t bmap; /* each bit represents one dirty block in object */
-       struct object_cache *oc;
-       struct rb_node node;
-       struct rb_node dirty_node;
-       struct list_head dirty_list;
-       struct list_head object_list;
-       struct cds_list_head lru_list;
+       int (*read)(struct object_cache_entry *, void *, size_t, off_t);
+       int (*write)(struct object_cache_entry *, void *, size_t, off_t, int);
 };
 
 static struct global_cache sys_cache;
@@ -280,11 +283,11 @@ static inline void lru_move_entry(struct 
object_cache_entry *entry)
 
 static inline void update_cache_entry(struct object_cache_entry *entry,
                                      uint32_t idx, size_t datalen,
-                                     off_t offset, int wrt)
+                                     off_t offset, int dirty)
 {
        struct object_cache *oc = entry->oc;
 
-       if (wrt) {
+       if (dirty) {
                uint64_t bmap = calc_object_bmap(datalen, offset);
 
                pthread_rwlock_wrlock(&oc->lock);
@@ -295,13 +298,12 @@ static inline void update_cache_entry(struct 
object_cache_entry *entry,
        lru_move_entry(entry);
 }
 
-static int write_cache_object(struct object_cache_entry *entry, void *buf,
-                             size_t count, off_t offset)
+static int read_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
+                                     size_t count, off_t offset)
 {
        size_t size;
        int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
        struct strbuf p;
-       uint32_t vid = entry->oc->vid, idx = entry_idx(entry);
 
        strbuf_init(&p, PATH_MAX);
        strbuf_addstr(&p, cache_dir);
@@ -317,12 +319,12 @@ static int write_cache_object(struct object_cache_entry 
*entry, void *buf,
                goto out;
        }
 
-       if (flock(fd, LOCK_EX) < 0) {
+       if (flock(fd, LOCK_SH) < 0) {
                ret = SD_RES_EIO;
                eprintf("%m\n");
                goto out_close;
        }
-       size = xpwrite(fd, buf, count, offset);
+       size = xpread(fd, buf, count, offset);
        if (flock(fd, LOCK_UN) < 0) {
                ret = SD_RES_EIO;
                eprintf("%m\n");
@@ -336,7 +338,6 @@ static int write_cache_object(struct object_cache_entry 
*entry, void *buf,
                goto out_close;
        }
 
-       update_cache_entry(entry, idx, count, offset, 1);
 out_close:
        close(fd);
 out:
@@ -344,8 +345,8 @@ out:
        return ret;
 }
 
-static int read_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
-                                     size_t count, off_t offset)
+static int write_cache_object_noupdate(uint32_t vid, uint32_t idx, void *buf,
+                                      size_t count, off_t offset)
 {
        size_t size;
        int fd, flags = def_open_flags, ret = SD_RES_SUCCESS;
@@ -365,12 +366,12 @@ static int read_cache_object_noupdate(uint32_t vid, 
uint32_t idx, void *buf,
                goto out;
        }
 
-       if (flock(fd, LOCK_SH) < 0) {
+       if (flock(fd, LOCK_EX) < 0) {
                ret = SD_RES_EIO;
                eprintf("%m\n");
                goto out_close;
        }
-       size = xpread(fd, buf, count, offset);
+       size = xpwrite(fd, buf, count, offset);
        if (flock(fd, LOCK_UN) < 0) {
                ret = SD_RES_EIO;
                eprintf("%m\n");
@@ -390,6 +391,20 @@ out:
        strbuf_release(&p);
        return ret;
 }
+
+static int write_cache_object(struct object_cache_entry *entry, void *buf,
+                             size_t count, off_t offset, int create)
+{
+       uint32_t vid = entry->oc->vid, idx = entry_idx(entry);
+       int ret;
+
+       ret = write_cache_object_noupdate(vid, idx, buf, count, offset);
+
+       if (ret == SD_RES_SUCCESS)
+               update_cache_entry(entry, idx, count, offset, 1);
+       return ret;
+}
+
 static int read_cache_object(struct object_cache_entry *entry, void *buf,
                             size_t count, off_t offset)
 {
@@ -403,6 +418,40 @@ static int read_cache_object(struct object_cache_entry 
*entry, void *buf,
        return ret;
 }
 
+static int write_and_push_cache_object(struct object_cache_entry *entry,
+                                      void *buf, size_t count, off_t offset,
+                                      int create)
+{
+       uint32_t vid = entry->oc->vid, idx = entry_idx(entry);
+       uint64_t oid = idx_to_oid(vid, idx);
+       struct sd_req hdr;
+       int ret;
+
+       ret = write_cache_object_noupdate(vid, idx, buf, count, offset);
+
+       if (ret != SD_RES_SUCCESS)
+               return ret;
+
+       if (create)
+               sd_init_req(&hdr, SD_OP_CREATE_AND_WRITE_OBJ);
+       else
+               sd_init_req(&hdr, SD_OP_WRITE_OBJ);
+       hdr.flags =  SD_FLAG_CMD_WRITE;
+       hdr.data_length = count;
+
+       hdr.obj.oid = oid;
+       hdr.obj.offset = offset;
+
+       ret = exec_local_req(&hdr, buf);
+       if (ret != SD_RES_SUCCESS) {
+               eprintf("failed to write object %" PRIx64 ", %x\n", oid, ret);
+               return ret;
+       }
+
+       update_cache_entry(entry, idx, count, offset, 0);
+       return ret;
+}
+
 static int push_cache_object(uint32_t vid, uint32_t idx, uint64_t bmap,
                             int create)
 {
@@ -589,8 +638,15 @@ not_found:
 
                pthread_rwlock_init(&cache->lock, NULL);
                hlist_add_head(&cache->hash, head);
-       } else
+
+               cache->read = read_cache_object;
+               if (sys->writethrough)
+                       cache->write = write_and_push_cache_object;
+               else
+                       cache->write = write_cache_object;
+       } else {
                cache = NULL;
+       }
 out:
        pthread_mutex_unlock(&hashtable_lock[h]);
        return cache;
@@ -690,10 +746,14 @@ static int object_cache_lookup(struct object_cache *oc, 
uint32_t idx,
                data_length = SD_DATA_OBJ_SIZE;
 
        ret = prealloc(fd, data_length);
-       if (ret != SD_RES_SUCCESS)
+       if (ret != SD_RES_SUCCESS) {
                ret = SD_RES_EIO;
-       else
-               add_to_object_cache(oc, idx, 1);
+       } else {
+               if (sys->writethrough)
+                       add_to_object_cache(oc, idx, 0);
+               else
+                       add_to_object_cache(oc, idx, 1);
+       }
        close(fd);
 out:
        strbuf_release(&buf);
@@ -990,17 +1050,16 @@ retry:
        }
 
        if (hdr->flags & SD_FLAG_CMD_WRITE) {
-               ret = write_cache_object(entry, req->data, hdr->data_length,
-                                        hdr->obj.offset);
+               ret = cache->write(entry, req->data, hdr->data_length,
+                                 hdr->obj.offset, create);
                if (ret != SD_RES_SUCCESS)
                        goto err;
        } else {
-               ret = read_cache_object(entry, req->data, hdr->data_length,
-                                       hdr->obj.offset);
+               ret = cache->read(entry, req->data, hdr->data_length,
+                                  hdr->obj.offset);
                if (ret != SD_RES_SUCCESS)
                        goto err;
                req->rp.data_length = hdr->data_length;
-
        }
 err:
        put_cache_entry(entry);
@@ -1026,7 +1085,7 @@ int object_cache_write(uint64_t oid, char *data, unsigned 
int datalen,
                return SD_RES_NO_CACHE;
        }
 
-       ret = write_cache_object(entry, data, datalen, offset);
+       ret = write_cache_object(entry, data, datalen, offset, create);
 
        put_cache_entry(entry);
 
diff --git a/sheep/sheep.c b/sheep/sheep.c
index 52a294b..7f34a87 100644
--- a/sheep/sheep.c
+++ b/sheep/sheep.c
@@ -192,8 +192,9 @@ int main(int argc, char **argv)
        int af;
        char *p;
        struct cluster_driver *cdrv;
-       int enable_write_cache = 0; /* disabled by default */
+       int enable_object_cache = 0; /* disabled by default */
        char *pid_file = NULL;
+       char *object_cache_size, *object_cache_mode;
 
        signal(SIGPIPE, SIG_IGN);
 
@@ -261,8 +262,10 @@ int main(int argc, char **argv)
                        sys->this_node.zone = zone;
                        break;
                case 'w':
-                       enable_write_cache = 1;
-                       cache_size = strtol(optarg, &p, 10);
+                       enable_object_cache = 1;
+                       object_cache_size = strtok(optarg, ",");
+                       object_cache_mode = strtok(NULL, ",");
+                       cache_size = strtol(object_cache_size, &p, 10);
                        if (optarg == p || cache_size < 0 ||
                            UINT64_MAX < cache_size) {
                                fprintf(stderr, "Invalid cache size '%s': "
@@ -270,9 +273,16 @@ int main(int argc, char **argv)
                                        optarg, UINT64_MAX);
                                exit(1);
                        }
-                       vprintf(SDOG_INFO, "enable write cache, max cache size 
%" PRIu64 "M\n",
-                               cache_size);
                        sys->cache_size = cache_size * 1024 * 1024;
+
+                       if (!object_cache_mode ||
+                           strcmp(object_cache_mode, "writeback") != 0) {
+                               sys->writethrough = 1;
+                       }
+                       vprintf(SDOG_INFO, "enable write cache, "
+                               "max cache size %" PRIu64 "M, %s mode\n",
+                               cache_size, sys->writethrough ?
+                               "writethrough" : "writeback");
                        break;
                case 'v':
                        nr_vnodes = strtol(optarg, &p, 10);
@@ -326,7 +336,7 @@ int main(int argc, char **argv)
        if (ret)
                exit(1);
 
-       ret = init_store(dir, enable_write_cache);
+       ret = init_store(dir, enable_object_cache);
        if (ret)
                exit(1);
 
diff --git a/sheep/sheep_priv.h b/sheep/sheep_priv.h
index 0c0e588..0c30851 100644
--- a/sheep/sheep_priv.h
+++ b/sheep/sheep_priv.h
@@ -119,6 +119,7 @@ struct cluster_info {
        int use_directio;
        uint8_t gateway_only;
        uint8_t disable_recovery;
+       uint8_t writethrough;
 
        struct work_queue *gateway_wqueue;
        struct work_queue *io_wqueue;
-- 
1.7.10.2

-- 
sheepdog mailing list
[email protected]
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to