From: Liu Yuan <tailai...@taobao.com> - change the aio framework -- have submiter a dedicated kthread to avoid lockup - flesh out sheep_handle_reply and sheep_aiocb_submit
Now we have two kthreads, one is for request submitting and the other is for reply handling. The tricky part is that sheep_submit_sdreq() will be called by both kthreads, so we add a mutex to avoid send data intermingling, which would cause a very tricky bug that sheep client will get wrong sd_req structure and reply with very wrong sd_rsp. The main problems for aio framework are, - split the striding request into individual sheep requests for each objects. - we only send one create request if object isn't created and block the requests to this object until the creation succeed then send the blocking requests. To get best of performance, # echo 4096 > /sys/block/sbd0/queue/max_sectors_kb Which means io scheduler will try its best to handle us 4MB request. Signed-off-by: Liu Yuan <namei.u...@gmail.com> --- sbd/sbd.h | 16 ++- sbd/sheep.c | 261 +++++++++++++++++++++++++++++++++++++++++------ sbd/sheep_block_device.c | 78 +++++++++++--- 3 files changed, 306 insertions(+), 49 deletions(-) diff --git a/sbd/sbd.h b/sbd/sbd.h index a2252ff..dbabc7a 100644 --- a/sbd/sbd.h +++ b/sbd/sbd.h @@ -43,13 +43,19 @@ struct sbd_device { spinlock_t queue_lock; /* request queue lock */ struct sheep_vdi vdi; /* Associated sheep image */ + spinlock_t vdi_lock; - struct list_head inflight_head; - wait_queue_head_t inflight_wq; - struct list_head blocking_head; + struct list_head request_head; /* protected by queue lock */ + struct list_head inflight_head; /* for inflight sheep requests */ + struct list_head blocking_head; /* for blocking sheep requests */ + rwlock_t inflight_lock; + rwlock_t blocking_lock; struct list_head list; struct task_struct *reaper; + struct task_struct *submiter; + wait_queue_head_t reaper_wq; + wait_queue_head_t submiter_wq; }; struct sheep_aiocb { @@ -57,10 +63,10 @@ struct sheep_aiocb { u64 offset; u64 length; int ret; - u32 nr_requests; + atomic_t nr_requests; char *buf; int buf_iter; - void (*aio_done_func)(struct sheep_aiocb *, bool); + void (*aio_done_func)(struct sheep_aiocb *); }; enum sheep_request_type { diff --git a/sbd/sheep.c b/sbd/sheep.c index 33269b4..bcc65f0 100644 --- a/sbd/sheep.c +++ b/sbd/sheep.c @@ -11,12 +11,25 @@ #include "sbd.h" +/* FIXME I need this hack to compile DEFINE_MUTEX successfully */ +#ifdef __SPIN_LOCK_UNLOCKED +# undef __SPIN_LOCK_UNLOCKED +# define __SPIN_LOCK_UNLOCKED(lockname) __SPIN_LOCK_INITIALIZER(lockname) +#endif + +static DEFINE_MUTEX(socket_mutex); + void socket_shutdown(struct socket *sock) { if (sock) kernel_sock_shutdown(sock, SHUT_RDWR); } +static struct sbd_device *sheep_request_to_device(struct sheep_request *req) +{ + return req->aiocb->request->q->queuedata; +} + static struct sbd_device *sheep_aiocb_to_device(struct sheep_aiocb *aiocb) { return aiocb->request->q->queuedata; @@ -49,7 +62,7 @@ static int socket_create(struct socket **sock, const char *ip_addr, int port) (char *)&nodelay, sizeof(nodelay)); set_fs(oldmm); if (ret != 0) { - pr_err("Can't set SO_LINGER: %d\n", ret); + pr_err("Can't set nodelay: %d\n", ret); goto shutdown; } @@ -129,14 +142,20 @@ static int socket_write(struct socket *sock, void *buf, int len) static int sheep_submit_sdreq(struct socket *sock, struct sd_req *hdr, void *data, unsigned int wlen) { - int ret = socket_write(sock, hdr, sizeof(*hdr)); + int ret; + /* Make sheep_submit_sdreq thread safe */ + mutex_lock(&socket_mutex); + + ret = socket_write(sock, hdr, sizeof(*hdr)); if (ret < 0) - return ret; + goto out; if (wlen) - return socket_write(sock, data, wlen); - return 0; + ret = socket_write(sock, data, wlen); +out: + mutex_unlock(&socket_mutex); + return ret; } /* Run the request synchronously */ @@ -252,33 +271,58 @@ out: static void submit_sheep_request(struct sheep_request *req) { + struct sd_req hdr = {}; + struct sbd_device *dev = sheep_request_to_device(req); + + hdr.id = req->seq_num; + hdr.data_length = req->length; + hdr.obj.oid = req->oid; + hdr.obj.offset = req->offset; + + write_lock(&dev->inflight_lock); + BUG_ON(!list_empty(&req->list)); + list_add_tail(&req->list, &dev->inflight_head); + write_unlock(&dev->inflight_lock); + + switch (req->type) { + case SHEEP_CREATE: + case SHEEP_WRITE: + if (req->type == SHEEP_CREATE) + hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ; + else + hdr.opcode = SD_OP_WRITE_OBJ; + hdr.flags = SD_FLAG_CMD_WRITE | SD_FLAG_CMD_DIRECT; + sheep_submit_sdreq(dev->sock, &hdr, req->buf, req->length); + break; + case SHEEP_READ: + hdr.opcode = SD_OP_READ_OBJ; + sheep_submit_sdreq(dev->sock, &hdr, NULL, 0); + break; + } + sbd_debug("add oid %llx off %d, len %d, seq %u\n", req->oid, + req->offset, req->length, req->seq_num); + wake_up(&dev->reaper_wq); } static inline void free_sheep_aiocb(struct sheep_aiocb *aiocb) { - kfree(aiocb->buf); + vfree(aiocb->buf); kfree(aiocb); } -static void aio_write_done(struct sheep_aiocb *aiocb, bool locked) +static void aio_write_done(struct sheep_aiocb *aiocb) { - sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length); + sbd_debug("wdone off %llu, len %llu\n", aiocb->offset, aiocb->length); - if (locked) - __blk_end_request_all(aiocb->request, aiocb->ret); - else - blk_end_request_all(aiocb->request, aiocb->ret); + blk_end_request_all(aiocb->request, aiocb->ret); free_sheep_aiocb(aiocb); } -static void aio_read_done(struct sheep_aiocb *aiocb, bool locked) +static void aio_read_done(struct sheep_aiocb *aiocb) { - sbd_debug("off %llu, len %llu\n", aiocb->offset, aiocb->length); + sbd_debug("rdone off %llu, len %llu\n", aiocb->offset, aiocb->length); - if (locked) - __blk_end_request_all(aiocb->request, aiocb->ret); - else - blk_end_request_all(aiocb->request, aiocb->ret); + blk_end_request_all(aiocb->request, aiocb->ret); free_sheep_aiocb(aiocb); } @@ -294,11 +338,16 @@ struct sheep_aiocb *sheep_aiocb_setup(struct request *req) aiocb->offset = blk_rq_pos(req) * SECTOR_SIZE; aiocb->length = blk_rq_bytes(req); - aiocb->nr_requests = 0; aiocb->ret = 0; aiocb->buf_iter = 0; aiocb->request = req; - aiocb->buf = kzalloc(aiocb->length, GFP_KERNEL); + aiocb->buf = vzalloc(aiocb->length); + atomic_set(&aiocb->nr_requests, 0); + + if (!aiocb->buf) { + kfree(aiocb); + return ERR_PTR(-ENOMEM); + } switch (rq_data_dir(req)) { case WRITE: @@ -343,6 +392,7 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb, req->aiocb = aiocb; req->buf = aiocb->buf + aiocb->buf_iter; req->seq_num = atomic_inc_return(&dev->seq_num); + INIT_LIST_HEAD(&req->list); switch (rq_data_dir(aiocb->request)) { case WRITE: @@ -359,23 +409,51 @@ static struct sheep_request *alloc_sheep_request(struct sheep_aiocb *aiocb, } aiocb->buf_iter += len; - aiocb->nr_requests++; + atomic_inc(&aiocb->nr_requests); return req; } -static void end_sheep_request(struct sheep_request *req, bool queue_locked) +static void end_sheep_request(struct sheep_request *req) { struct sheep_aiocb *aiocb = req->aiocb; - if (--aiocb->nr_requests == 0) - aiocb->aio_done_func(aiocb, queue_locked); - sbd_debug("end oid %llx off %d, len %d, seq %u\n", req->oid, req->offset, req->length, req->seq_num); + + if (atomic_dec_return(&aiocb->nr_requests) <= 0) + aiocb->aio_done_func(aiocb); + BUG_ON(!list_empty(&req->list)); kfree(req); } +static struct sheep_request *find_inflight_request_oid(struct sbd_device *dev, + uint64_t oid) +{ + struct sheep_request *req; + + read_lock(&dev->inflight_lock); + list_for_each_entry(req, &dev->inflight_head, list) { + if (req->oid == oid) { + read_unlock(&dev->inflight_lock); + return req; + } + } + read_unlock(&dev->inflight_lock); + return NULL; +} + +static bool sheep_inode_has_idx(struct sbd_device *dev, u32 idx) +{ + spin_lock(&dev->vdi_lock); + if (dev->vdi.inode->data_vdi_id[idx]) { + spin_unlock(&dev->vdi_lock); + return true; + } + spin_unlock(&dev->vdi_lock); + return false; +} + int sheep_aiocb_submit(struct sheep_aiocb *aiocb) { struct sbd_device *dev = sheep_aiocb_to_device(aiocb); @@ -384,35 +462,59 @@ int sheep_aiocb_submit(struct sheep_aiocb *aiocb) u64 start = offset % SD_DATA_OBJ_SIZE; u32 vid = dev->vdi.vid; u64 oid = vid_to_data_oid(vid, offset / SD_DATA_OBJ_SIZE); - u32 idx = data_oid_to_idx(oid); int len = SD_DATA_OBJ_SIZE - start; if (total < len) len = total; - sbd_debug("submit oid %llx off %llu, len %llu\n", oid, offset, total); + sbd_debug("submit off %llu, len %llu\n", offset, total); /* * Make sure we don't free the aiocb before we are done with all * requests.This additional reference is dropped at the end of this * function. */ - aiocb->nr_requests++; + atomic_inc(&aiocb->nr_requests); do { struct sheep_request *req; + u32 idx = data_oid_to_idx(oid); req = alloc_sheep_request(aiocb, oid, len, start); if (IS_ERR(req)) return PTR_ERR(req); - if (likely(dev->vdi.inode->data_vdi_id[idx])) + if (likely(sheep_inode_has_idx(dev, idx))) goto submit; /* Object is not created yet... */ switch (req->type) { case SHEEP_WRITE: + /* + * Sheepdog can't handle concurrent creation on the same + * object. We send one create req first and then send + * write reqs in next. + */ + if (find_inflight_request_oid(dev, oid)) { + write_lock(&dev->blocking_lock); + /* + * There are slim chance object was created + * before we grab blocking_lock + */ + if (unlikely(sheep_inode_has_idx(dev, idx))) { + write_unlock(&dev->blocking_lock); + goto submit; + } + list_add_tail(&req->list, &dev->blocking_head); + sbd_debug("block oid %llx off %d, len %d," + " seq %u\n", req->oid, req->offset, + req->length, req->seq_num); + write_unlock(&dev->blocking_lock); + goto done; + } + req->type = SHEEP_CREATE; + break; case SHEEP_READ: - end_sheep_request(req, true); + end_sheep_request(req); goto done; } submit: @@ -424,13 +526,108 @@ done: len = total > SD_DATA_OBJ_SIZE ? SD_DATA_OBJ_SIZE : total; } while (total > 0); - if (--aiocb->nr_requests == 0) - aiocb->aio_done_func(aiocb, true); + if (atomic_dec_return(&aiocb->nr_requests) <= 0) + aiocb->aio_done_func(aiocb); return 0; } +static struct sheep_request *fetch_inflight_request(struct sbd_device *dev, + u32 seq_num) +{ + struct sheep_request *req, *t; + + write_lock(&dev->inflight_lock); + list_for_each_entry_safe(req, t, &dev->inflight_head, list) { + if (req->seq_num == seq_num) { + list_del_init(&req->list); + goto out; + } + } + req = NULL; +out: + write_unlock(&dev->inflight_lock); + return req; +} + +static void submit_blocking_sheep_request(struct sbd_device *dev, uint64_t oid) +{ + struct sheep_request *req, *t; + + write_lock(&dev->blocking_lock); + list_for_each_entry_safe(req, t, &dev->blocking_head, list) { + if (req->oid != oid) + continue; + list_del_init(&req->list); + submit_sheep_request(req); + } + write_unlock(&dev->blocking_lock); +} + int sheep_handle_reply(struct sbd_device *dev) { + struct sd_rsp rsp = {}; + struct sheep_request *req, *new; + uint32_t vid, idx; + uint64_t oid; + int ret; + + ret = socket_read(dev->sock, (char *)&rsp, sizeof(rsp)); + if (ret < 0) { + pr_err("failed to read reply header\n"); + goto err; + } + + req = fetch_inflight_request(dev, rsp.id); + if (!req) { + pr_err("failed to find req %u\n", rsp.id); + return 0; + } + if (rsp.data_length > 0) { + ret = socket_read(dev->sock, req->buf, req->length); + if (ret < 0) { + pr_err("failed to read reply payload\n"); + goto err; + } + } + + switch (req->type) { + case SHEEP_CREATE: + /* We need to update inode for create */ + new = kmalloc(sizeof(*new), GFP_KERNEL); + if (!new) { + ret = -ENOMEM; + goto err; + } + + vid = dev->vdi.vid; + oid = vid_to_vdi_oid(vid); + idx = data_oid_to_idx(req->oid); + new->offset = SD_INODE_HEADER_SIZE + sizeof(vid) * idx; + new->length = sizeof(vid); + new->oid = oid; + new->aiocb = req->aiocb; + new->buf = (char *)&vid; + new->seq_num = atomic_inc_return(&dev->seq_num); + new->type = SHEEP_WRITE; + atomic_inc(&req->aiocb->nr_requests); + INIT_LIST_HEAD(&new->list); + + /* Make sure no request is queued while we update inode */ + spin_lock(&dev->vdi_lock); + dev->vdi.inode->data_vdi_id[idx] = vid; + spin_unlock(&dev->vdi_lock); + + submit_sheep_request(new); + submit_blocking_sheep_request(dev, req->oid); + /* fall thru */ + case SHEEP_WRITE: + case SHEEP_READ: + end_sheep_request(req); + break; + } + return 0; +err: + return ret; } diff --git a/sbd/sheep_block_device.c b/sbd/sheep_block_device.c index c2c9dce..b953e36 100644 --- a/sbd/sheep_block_device.c +++ b/sbd/sheep_block_device.c @@ -35,21 +35,26 @@ static int sbd_submit_request(struct request *req) return sheep_aiocb_submit(aiocb); } -static void sbd_request_submiter(struct request_queue *q) +static void sbd_request_fn(struct request_queue *q) +__releases(q->queue_lock) __acquires(q->queue_lock) { struct request *req; + struct sbd_device *dev = q->queuedata; while ((req = blk_fetch_request(q)) != NULL) { - int ret; /* filter out block requests we don't understand */ - if (req->cmd_type != REQ_TYPE_FS) { + if (unlikely(req->cmd_type != REQ_TYPE_FS)) { __blk_end_request_all(req, 0); continue; } - ret = sbd_submit_request(req); - if (ret < 0) - break; + + list_add_tail(&req->queuelist, &dev->request_head); + spin_unlock_irq(q->queue_lock); + + wake_up(&dev->submiter_wq); + + spin_lock_irq(q->queue_lock); } } @@ -68,7 +73,7 @@ static int sbd_add_disk(struct sbd_device *dev) disk->fops = &sbd_bd_ops; disk->private_data = dev; - rq = blk_init_queue(sbd_request_submiter, &dev->queue_lock); + rq = blk_init_queue(sbd_request_fn, &dev->queue_lock); if (!rq) { put_disk(disk); return -ENOMEM; @@ -93,16 +98,53 @@ static int sbd_add_disk(struct sbd_device *dev) static int sbd_request_reaper(void *data) { struct sbd_device *dev = data; + int ret; while (!kthread_should_stop() || !list_empty(&dev->inflight_head)) { - wait_event_interruptible(dev->inflight_wq, + bool empty; + + wait_event_interruptible(dev->reaper_wq, kthread_should_stop() || !list_empty(&dev->inflight_head)); - if (list_empty(&dev->inflight_head)) + read_lock(&dev->inflight_lock); + empty = list_empty(&dev->inflight_head); + read_unlock(&dev->inflight_lock); + + if (unlikely(empty)) continue; - sheep_handle_reply(dev); + ret = sheep_handle_reply(dev); + if (unlikely(ret < 0)) + pr_err("reaper: failed to handle reply\n"); + } + return 0; +} + +static int sbd_request_submiter(void *data) +{ + struct sbd_device *dev = data; + int ret; + + while (!kthread_should_stop() || !list_empty(&dev->request_head)) { + struct request *req; + + wait_event_interruptible(dev->submiter_wq, + kthread_should_stop() || + !list_empty(&dev->request_head)); + + spin_lock_irq(&dev->queue_lock); + if (unlikely(list_empty(&dev->request_head))) { + spin_unlock_irq(&dev->queue_lock); + continue; + } + req = list_entry_rq(dev->request_head.next); + list_del_init(&req->queuelist); + spin_unlock_irq(&dev->queue_lock); + + ret = sbd_submit_request(req); + if (unlikely(ret < 0)) + pr_err("submiter: failed to submit request\n"); } return 0; } @@ -138,9 +180,14 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf, } spin_lock_init(&dev->queue_lock); + spin_lock_init(&dev->vdi_lock); INIT_LIST_HEAD(&dev->inflight_head); INIT_LIST_HEAD(&dev->blocking_head); - init_waitqueue_head(&dev->inflight_wq); + INIT_LIST_HEAD(&dev->request_head); + init_waitqueue_head(&dev->reaper_wq); + init_waitqueue_head(&dev->submiter_wq); + rwlock_init(&dev->inflight_lock); + rwlock_init(&dev->blocking_lock); list_for_each_entry(tmp, &sbd_dev_list, list) { if (tmp->id > new_id) @@ -159,6 +206,11 @@ static ssize_t sbd_add(struct bus_type *bus, const char *buf, dev->major = ret; dev->minor = 0; dev->reaper = kthread_run(sbd_request_reaper, dev, "sbd_reaper"); + if (IS_ERR(dev->reaper)) + goto err_unreg_blkdev; + dev->submiter = kthread_run(sbd_request_submiter, dev, "sbd_submiter"); + if (IS_ERR(dev->submiter)) + goto err_unreg_blkdev; ret = sbd_add_disk(dev); if (ret < 0) @@ -222,7 +274,9 @@ static ssize_t sbd_remove(struct bus_type *bus, const char *buf, return -ENOENT; kthread_stop(dev->reaper); - wake_up_interruptible(&dev->inflight_wq); + kthread_stop(dev->submiter); + wake_up(&dev->reaper_wq); + wake_up(&dev->submiter_wq); sbd_del_disk(dev); free_sbd_device(dev); -- 1.8.1.2 -- sheepdog mailing list sheepdog@lists.wpkg.org http://lists.wpkg.org/mailman/listinfo/sheepdog