From: Liu Yuan <tailai...@taobao.com>

- change the aio framework
  -- have submiter a dedicated kthread to avoid lockup
- flesh out sheep_handle_reply and sheep_aiocb_submit

Now we have two kthreads, one is for request submitting and the other is for
reply handling.

The tricky part is that sheep_submit_sdreq() will be called by both kthreads,
so we add a mutex to avoid send data intermingling, which would cause a very
tricky bug that sheep client will get wrong sd_req structure and reply with
very wrong sd_rsp.

The main problems for aio framework are,
- split the striding request into individual sheep requests for each objects.
- we only send one create request if object isn't created and block the requests
  to this object until the creation succeed then send the blocking requests.

To get best of performance,

# echo 4096 > /sys/block/sbd0/queue/max_sectors_kb

Which means io scheduler will try its best to handle us 4MB request.

Signed-off-by: Liu Yuan <namei.u...@gmail.com>
---
 sbd/sbd.h                |  16 ++-
 sbd/sheep.c              | 261 +++++++++++++++++++++++++++++++++++++++++------
 sbd/sheep_block_device.c |  78 +++++++++++---
 3 files changed, 306 insertions(+), 49 deletions(-)

diff --git a/sbd/sbd.h b/sbd/sbd.h
index a2252ff..dbabc7a 100644
--- a/sbd/sbd.h
+++ b/sbd/sbd.h
@@ -43,13 +43,19 @@ struct sbd_device {
        spinlock_t queue_lock;   /* request queue lock */
 
        struct sheep_vdi vdi;           /* Associated sheep image */
+       spinlock_t vdi_lock;
 
-       struct list_head inflight_head;
-       wait_queue_head_t inflight_wq;
-       struct list_head blocking_head;
+       struct list_head request_head; /* protected by queue lock */
+       struct list_head inflight_head; /* for inflight sheep requests */
+       struct list_head blocking_head; /* for blocking sheep requests */
+       rwlock_t inflight_lock;
+       rwlock_t blocking_lock;
 
        struct list_head list;
        struct task_struct *reaper;
+       struct task_struct *submiter;
+       wait_queue_head_t reaper_wq;
+       wait_queue_head_t submiter_wq;
 };
 
 struct sheep_aiocb {
@@ -57,10 +63,10 @@ struct sheep_aiocb {
        u64 offset;
        u64 length;
        int ret;
-       u32 nr_requests;
+       atomic_t nr_requests;
        char *buf;
        int buf_iter;
-       void (*aio_done_func)(struct sheep_aiocb *, bool);
+       void (*aio_done_func)(struct sheep_aiocb *);
 };
 
 enum sheep_request_type {
diff --git a/sbd/sheep.c b/sbd/sheep.c
index 33269b4..bcc65f0 100644
--- a/sbd/sheep.c
+++ b/sbd/sheep.c
@@ -11,12 +11,25 @@
 
 #include "sbd.h"
 
+/* FIXME I need this hack to compile DEFINE_MUTEX successfully */
+#ifdef __SPIN_LOCK_UNLOCKED
+# undef __SPIN_LOCK_UNLOCKED
+# define __SPIN_LOCK_UNLOCKED(lockname) __SPIN_LOCK_INITIALIZER(lockname)
+#endif
+
+static DEFINE_MUTEX(socket_mutex);
+
 void socket_shutdown(struct socket *sock)
 {
        if (sock)
                kernel_sock_shutdown(sock, SHUT_RDWR);
 }
 
+static struct sbd_device *sheep_request_to_device(struct sheep_request *req)
+{
+       return req->aiocb->request->q->queuedata;
+}
+
 static struct sbd_device *sheep_aiocb_to_device(struct sheep_aiocb *aiocb)
 {
        return aiocb->request->q->queuedata;
@@ -49,7 +62,7 @@ static int socket_create(struct socket **sock, const char 
*ip_addr, int port)
                              (char *)&nodelay, sizeof(nodelay));
        set_fs(oldmm);
        if (ret != 0) {
-               pr_err("Can't set SO_LINGER: %d\n", ret);
+               pr_err("Can't set nodelay: %d\n", ret);
                goto shutdown;
        }
 
@@ -129,14 +142,20 @@ static int socket_write(struct socket *sock, void *buf, 
int len)
 static int sheep_submit_sdreq(struct socket *sock, struct sd_req *hdr,
                              void *data, unsigned int wlen)
 {
-       int ret = socket_write(sock, hdr, sizeof(*hdr));
+       int ret;
 
+       /* Make sheep_submit_sdreq thread safe */
+       mutex_lock(&socket_mutex);
+
+       ret = socket_write(sock, hdr, sizeof(*hdr));
        if (ret < 0)
-               return ret;
+               goto out;
 
        if (wlen)
-               return socket_write(sock, data, wlen);
-       return 0;
+               ret = socket_write(sock, data, wlen);
+out:
+       mutex_unlock(&socket_mutex);
+       return ret;
 }
 
 /* Run the request synchronously */
@@ -252,33 +271,58 @@ out:
 
 static void submit_sheep_request(struct sheep_request *req)
 {
+       struct sd_req hdr = {};
+       struct sbd_device *dev = sheep_request_to_device(req);
+
+       hdr.id = req->seq_num;
+       hdr.data_length = req->length;
+       hdr.obj.oid = req->oid;
+       hdr.obj.offset = req->offset;
+
+       write_lock(&dev->inflight_lock);
+       BUG_ON(!list_empty(&req->list));
+       list_add_tail(&req->list, &dev->inflight_head);
+       write_unlock(&dev->inflight_lock);
+
+       switch (req->type) {
+       case SHEEP_CREATE:
+       case SHEEP_WRITE:
+               if (req->type == SHEEP_CREATE)
+                       hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
+               else
+                       hdr.opcode = SD_OP_WRITE_OBJ;
+               hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT;
+               sheep_submit_sdreq(dev->sock, &hdr, req->buf, req->length);
+               break;
+       case SHEEP_READ:
+               hdr.opcode = SD_OP_READ_OBJ;
+               sheep_submit_sdreq(dev->sock, &hdr, NULL, 0);
+               break;
+       }
+       sbd_debug("add oid %llx off %d, len %d, seq %u\n", req->oid,
+                 req->offset, req->length, req->seq_num);
+       wake_up(&dev->reaper_wq);
 }
 
 static inline void free_sheep_aiocb(struct sheep_aiocb *aiocb)
 {
-       kfree(aiocb->buf);
+       vfree(aiocb->buf);
        kfree(aiocb);
 }
 
-static void aio_write_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_write_done(struct sheep_aiocb *aiocb)
 {
-       sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+       sbd_debug("wdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
 
-       if (locked)
-               __blk_end_request_all(aiocb->request, aiocb->ret);
-       else
-               blk_end_request_all(aiocb->request, aiocb->ret);
+       blk_end_request_all(aiocb->request, aiocb->ret);
        free_sheep_aiocb(aiocb);
 }
 
-static void aio_read_done(struct sheep_aiocb *aiocb, bool locked)
+static void aio_read_done(struct sheep_aiocb *aiocb)
 {
-       sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length);
+       sbd_debug("rdone off %llu, len %llu\n", aiocb->offset, aiocb->length);
 
-       if (locked)
-               __blk_end_request_all(aiocb->request, aiocb->ret);
-       else
-               blk_end_request_all(aiocb->request, aiocb->ret);
+       blk_end_request_all(aiocb->request, aiocb->ret);
        free_sheep_aiocb(aiocb);
 }
 
@@ -294,11 +338,16 @@ struct sheep_aiocb *sheep_aiocb_setup(struct request *req)
 
        aiocb->offset = blk_rq_pos(req) * SECTOR_SIZE;
        aiocb->length = blk_rq_bytes(req);
-       aiocb->nr_requests = 0;
        aiocb->ret = 0;
        aiocb->buf_iter = 0;
        aiocb->request = req;
-       aiocb->buf = kzalloc(aiocb->length, GFP_KERNEL);
+       aiocb->buf = vzalloc(aiocb->length);
+       atomic_set(&aiocb->nr_requests, 0);
+
+       if (!aiocb->buf) {
+               kfree(aiocb);
+               return ERR_PTR(-ENOMEM);
+       }
 
        switch (rq_data_dir(req)) {
        case WRITE:
@@ -343,6 +392,7 @@ static struct sheep_request *alloc_sheep_request(struct 
sheep_aiocb *aiocb,
        req->aiocb = aiocb;
        req->buf = aiocb->buf + aiocb->buf_iter;
        req->seq_num = atomic_inc_return(&dev->seq_num);
+       INIT_LIST_HEAD(&req->list);
 
        switch (rq_data_dir(aiocb->request)) {
        case WRITE:
@@ -359,23 +409,51 @@ static struct sheep_request *alloc_sheep_request(struct 
sheep_aiocb *aiocb,
        }
 
        aiocb->buf_iter += len;
-       aiocb->nr_requests++;
+       atomic_inc(&aiocb->nr_requests);
 
        return req;
 }
 
-static void end_sheep_request(struct sheep_request *req, bool queue_locked)
+static void end_sheep_request(struct sheep_request *req)
 {
        struct sheep_aiocb *aiocb = req->aiocb;
 
-       if (--aiocb->nr_requests == 0)
-               aiocb->aio_done_func(aiocb, queue_locked);
-
        sbd_debug("end oid %llx off %d, len %d, seq %u\n", req->oid,
                  req->offset, req->length, req->seq_num);
+
+       if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+               aiocb->aio_done_func(aiocb);
+       BUG_ON(!list_empty(&req->list));
        kfree(req);
 }
 
+static struct sheep_request *find_inflight_request_oid(struct sbd_device *dev,
+                                                      uint64_t oid)
+{
+       struct sheep_request *req;
+
+       read_lock(&dev->inflight_lock);
+       list_for_each_entry(req, &dev->inflight_head, list) {
+               if (req->oid == oid) {
+                       read_unlock(&dev->inflight_lock);
+                       return req;
+               }
+       }
+       read_unlock(&dev->inflight_lock);
+       return NULL;
+}
+
+static bool sheep_inode_has_idx(struct sbd_device *dev, u32 idx)
+{
+       spin_lock(&dev->vdi_lock);
+       if (dev->vdi.inode->data_vdi_id[idx]) {
+               spin_unlock(&dev->vdi_lock);
+               return true;
+       }
+       spin_unlock(&dev->vdi_lock);
+       return false;
+}
+
 int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
 {
        struct sbd_device *dev = sheep_aiocb_to_device(aiocb);
@@ -384,35 +462,59 @@ int sheep_aiocb_submit(struct sheep_aiocb *aiocb)
        u64 start = offset % SD_DATA_OBJ_SIZE;
        u32 vid = dev->vdi.vid;
        u64 oid = vid_to_data_oid(vid, offset / SD_DATA_OBJ_SIZE);
-       u32 idx = data_oid_to_idx(oid);
        int len = SD_DATA_OBJ_SIZE - start;
 
        if (total < len)
                len = total;
 
-       sbd_debug("submit oid %llx off %llu, len %llu\n", oid, offset, total);
+       sbd_debug("submit off %llu, len %llu\n", offset, total);
        /*
         * Make sure we don't free the aiocb before we are done with all
         * requests.This additional reference is dropped at the end of this
         * function.
         */
-       aiocb->nr_requests++;
+       atomic_inc(&aiocb->nr_requests);
 
        do {
                struct sheep_request *req;
+               u32 idx = data_oid_to_idx(oid);
 
                req = alloc_sheep_request(aiocb, oid, len, start);
                if (IS_ERR(req))
                        return PTR_ERR(req);
 
-               if (likely(dev->vdi.inode->data_vdi_id[idx]))
+               if (likely(sheep_inode_has_idx(dev, idx)))
                        goto submit;
 
                /* Object is not created yet... */
                switch (req->type) {
                case SHEEP_WRITE:
+                       /*
+                        * Sheepdog can't handle concurrent creation on the same
+                        * object. We send one create req first and then send
+                        * write reqs in next.
+                        */
+                       if (find_inflight_request_oid(dev, oid)) {
+                               write_lock(&dev->blocking_lock);
+                               /*
+                                * There are slim chance object was created
+                                * before we grab blocking_lock
+                                */
+                               if (unlikely(sheep_inode_has_idx(dev, idx))) {
+                                       write_unlock(&dev->blocking_lock);
+                                       goto submit;
+                               }
+                               list_add_tail(&req->list, &dev->blocking_head);
+                               sbd_debug("block oid %llx off %d, len %d,"
+                                         " seq %u\n", req->oid, req->offset,
+                                         req->length, req->seq_num);
+                               write_unlock(&dev->blocking_lock);
+                               goto done;
+                       }
+                       req->type = SHEEP_CREATE;
+                       break;
                case SHEEP_READ:
-                       end_sheep_request(req, true);
+                       end_sheep_request(req);
                        goto done;
                }
 submit:
@@ -424,13 +526,108 @@ done:
                len = total > SD_DATA_OBJ_SIZE ? SD_DATA_OBJ_SIZE : total;
        } while (total > 0);
 
-       if (--aiocb->nr_requests == 0)
-               aiocb->aio_done_func(aiocb, true);
+       if (atomic_dec_return(&aiocb->nr_requests) <= 0)
+               aiocb->aio_done_func(aiocb);
 
        return 0;
 }
 
+static struct sheep_request *fetch_inflight_request(struct sbd_device *dev,
+                                                   u32 seq_num)
+{
+       struct sheep_request *req, *t;
+
+       write_lock(&dev->inflight_lock);
+       list_for_each_entry_safe(req, t, &dev->inflight_head, list) {
+               if (req->seq_num == seq_num) {
+                       list_del_init(&req->list);
+                       goto out;
+               }
+       }
+       req = NULL;
+out:
+       write_unlock(&dev->inflight_lock);
+       return req;
+}
+
+static void submit_blocking_sheep_request(struct sbd_device *dev, uint64_t oid)
+{
+       struct sheep_request *req, *t;
+
+       write_lock(&dev->blocking_lock);
+       list_for_each_entry_safe(req, t, &dev->blocking_head, list) {
+               if (req->oid != oid)
+                       continue;
+               list_del_init(&req->list);
+               submit_sheep_request(req);
+       }
+       write_unlock(&dev->blocking_lock);
+}
+
 int sheep_handle_reply(struct sbd_device *dev)
 {
+       struct sd_rsp rsp = {};
+       struct sheep_request *req, *new;
+       uint32_t vid, idx;
+       uint64_t oid;
+       int ret;
+
+       ret = socket_read(dev->sock, (char *)&rsp, sizeof(rsp));
+       if (ret < 0) {
+               pr_err("failed to read reply header\n");
+               goto err;
+       }
+
+       req = fetch_inflight_request(dev, rsp.id);
+       if (!req) {
+               pr_err("failed to find req %u\n", rsp.id);
+               return 0;
+       }
+       if (rsp.data_length > 0) {
+               ret = socket_read(dev->sock, req->buf, req->length);
+               if (ret < 0) {
+                       pr_err("failed to read reply payload\n");
+                       goto err;
+               }
+       }
+
+       switch (req->type) {
+       case SHEEP_CREATE:
+               /* We need to update inode for create */
+               new = kmalloc(sizeof(*new), GFP_KERNEL);
+               if (!new) {
+                       ret = -ENOMEM;
+                       goto err;
+               }
+
+               vid = dev->vdi.vid;
+               oid = vid_to_vdi_oid(vid);
+               idx = data_oid_to_idx(req->oid);
+               new->offset = SD_INODE_HEADER_SIZE + sizeof(vid) * idx;
+               new->length = sizeof(vid);
+               new->oid = oid;
+               new->aiocb = req->aiocb;
+               new->buf = (char *)&vid;
+               new->seq_num = atomic_inc_return(&dev->seq_num);
+               new->type = SHEEP_WRITE;
+               atomic_inc(&req->aiocb->nr_requests);
+               INIT_LIST_HEAD(&new->list);
+
+               /* Make sure no request is queued while we update inode */
+               spin_lock(&dev->vdi_lock);
+               dev->vdi.inode->data_vdi_id[idx] = vid;
+               spin_unlock(&dev->vdi_lock);
+
+               submit_sheep_request(new);
+               submit_blocking_sheep_request(dev, req->oid);
+               /* fall thru */
+       case SHEEP_WRITE:
+       case SHEEP_READ:
+               end_sheep_request(req);
+               break;
+       }
+
        return 0;
+err:
+       return ret;
 }
diff --git a/sbd/sheep_block_device.c b/sbd/sheep_block_device.c
index c2c9dce..b953e36 100644
--- a/sbd/sheep_block_device.c
+++ b/sbd/sheep_block_device.c
@@ -35,21 +35,26 @@ static int sbd_submit_request(struct request *req)
        return sheep_aiocb_submit(aiocb);
 }
 
-static void sbd_request_submiter(struct request_queue *q)
+static void sbd_request_fn(struct request_queue *q)
+__releases(q->queue_lock) __acquires(q->queue_lock)
 {
        struct request *req;
+       struct sbd_device *dev = q->queuedata;
 
        while ((req = blk_fetch_request(q)) != NULL) {
-               int ret;
 
                /* filter out block requests we don't understand */
-               if (req->cmd_type != REQ_TYPE_FS) {
+               if (unlikely(req->cmd_type != REQ_TYPE_FS)) {
                        __blk_end_request_all(req, 0);
                        continue;
                }
-               ret = sbd_submit_request(req);
-               if (ret < 0)
-                       break;
+
+               list_add_tail(&req->queuelist, &dev->request_head);
+               spin_unlock_irq(q->queue_lock);
+
+               wake_up(&dev->submiter_wq);
+
+               spin_lock_irq(q->queue_lock);
        }
 }
 
@@ -68,7 +73,7 @@ static int sbd_add_disk(struct sbd_device *dev)
        disk->fops = &sbd_bd_ops;
        disk->private_data = dev;
 
-       rq = blk_init_queue(sbd_request_submiter, &dev->queue_lock);
+       rq = blk_init_queue(sbd_request_fn, &dev->queue_lock);
        if (!rq) {
                put_disk(disk);
                return -ENOMEM;
@@ -93,16 +98,53 @@ static int sbd_add_disk(struct sbd_device *dev)
 static int sbd_request_reaper(void *data)
 {
        struct sbd_device *dev = data;
+       int ret;
 
        while (!kthread_should_stop() || !list_empty(&dev->inflight_head)) {
-               wait_event_interruptible(dev->inflight_wq,
+               bool empty;
+
+               wait_event_interruptible(dev->reaper_wq,
                                         kthread_should_stop() ||
                                         !list_empty(&dev->inflight_head));
 
-               if (list_empty(&dev->inflight_head))
+               read_lock(&dev->inflight_lock);
+               empty = list_empty(&dev->inflight_head);
+               read_unlock(&dev->inflight_lock);
+
+               if (unlikely(empty))
                        continue;
 
-               sheep_handle_reply(dev);
+               ret = sheep_handle_reply(dev);
+               if (unlikely(ret < 0))
+                       pr_err("reaper: failed to handle reply\n");
+       }
+       return 0;
+}
+
+static int sbd_request_submiter(void *data)
+{
+       struct sbd_device *dev = data;
+       int ret;
+
+       while (!kthread_should_stop() || !list_empty(&dev->request_head)) {
+               struct request *req;
+
+               wait_event_interruptible(dev->submiter_wq,
+                                        kthread_should_stop() ||
+                                        !list_empty(&dev->request_head));
+
+               spin_lock_irq(&dev->queue_lock);
+               if (unlikely(list_empty(&dev->request_head))) {
+                       spin_unlock_irq(&dev->queue_lock);
+                       continue;
+               }
+               req = list_entry_rq(dev->request_head.next);
+               list_del_init(&req->queuelist);
+               spin_unlock_irq(&dev->queue_lock);
+
+               ret = sbd_submit_request(req);
+               if (unlikely(ret < 0))
+                       pr_err("submiter: failed to submit request\n");
        }
        return 0;
 }
@@ -138,9 +180,14 @@ static ssize_t sbd_add(struct bus_type *bus, const char 
*buf,
        }
 
        spin_lock_init(&dev->queue_lock);
+       spin_lock_init(&dev->vdi_lock);
        INIT_LIST_HEAD(&dev->inflight_head);
        INIT_LIST_HEAD(&dev->blocking_head);
-       init_waitqueue_head(&dev->inflight_wq);
+       INIT_LIST_HEAD(&dev->request_head);
+       init_waitqueue_head(&dev->reaper_wq);
+       init_waitqueue_head(&dev->submiter_wq);
+       rwlock_init(&dev->inflight_lock);
+       rwlock_init(&dev->blocking_lock);
 
        list_for_each_entry(tmp, &sbd_dev_list, list) {
                if (tmp->id > new_id)
@@ -159,6 +206,11 @@ static ssize_t sbd_add(struct bus_type *bus, const char 
*buf,
        dev->major = ret;
        dev->minor = 0;
        dev->reaper = kthread_run(sbd_request_reaper, dev, "sbd_reaper");
+       if (IS_ERR(dev->reaper))
+               goto err_unreg_blkdev;
+       dev->submiter = kthread_run(sbd_request_submiter, dev, "sbd_submiter");
+       if (IS_ERR(dev->submiter))
+               goto err_unreg_blkdev;
 
        ret = sbd_add_disk(dev);
        if (ret < 0)
@@ -222,7 +274,9 @@ static ssize_t sbd_remove(struct bus_type *bus, const char 
*buf,
                return -ENOENT;
 
        kthread_stop(dev->reaper);
-       wake_up_interruptible(&dev->inflight_wq);
+       kthread_stop(dev->submiter);
+       wake_up(&dev->reaper_wq);
+       wake_up(&dev->submiter_wq);
 
        sbd_del_disk(dev);
        free_sbd_device(dev);
-- 
1.8.1.2

-- 
sheepdog mailing list
sheepdog@lists.wpkg.org
http://lists.wpkg.org/mailman/listinfo/sheepdog

Reply via email to