Process command submission in the mmap backing store with worker threads
just as we have in bs_sync.

---
 usr/Makefile  |    1 +
 usr/bs_mmap.c |  301 +++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 usr/mmc.c     |    1 -
 usr/sbc.c     |    1 -
 4 files changed, 277 insertions(+), 27 deletions(-)

Index: b/usr/Makefile
===================================================================
--- a/usr/Makefile
+++ b/usr/Makefile
@@ -29,6 +29,7 @@ ifneq ($(IBMVIO),)
 CFLAGS += -DIBMVIO -DUSE_KERNEL
 TGTD_OBJS += $(addprefix ibmvio/, ibmvio.o)
 TGTD_OBJS += bs_mmap.o tgtif.o
+LIBS += -lpthread
 endif
 
 ifneq ($(ISCSI),)
Index: b/usr/bs_mmap.c
===================================================================
--- a/usr/bs_mmap.c
+++ b/usr/bs_mmap.c
@@ -27,53 +27,303 @@
 #include <string.h>
 #include <unistd.h>
 #include <sys/mman.h>
+#include <pthread.h>
+#include <sys/epoll.h>
+#include <linux/fs.h>
 
 #include "list.h"
 #include "util.h"
 #include "tgtd.h"
 #include "scsi.h"
 
+#define NR_WORKER_THREADS      4
+
+struct bs_mmap_info {
+       pthread_t ack_thread;
+       pthread_t worker_thread[NR_WORKER_THREADS];
+
+       /* protected by pipe (command_fd) */
+       struct list_head ack_list;
+
+       pthread_cond_t finished_cond;
+       pthread_mutex_t finished_lock;
+       struct list_head finished_list;
+
+       /* workers sleep on this and are signaled by tgtd */
+       pthread_cond_t pending_cond;
+       /* locked by tgtd and workers */
+       pthread_mutex_t pending_lock;
+       /* protected by pending_lock */
+       struct list_head pending_list;
+
+       int command_fd[2];
+       int done_fd[2];
+
+       int stop;
+};
+
+static void *bs_mmap_ack_fn(void *arg)
+{
+       struct bs_mmap_info *info = arg;
+       int command, ret, nr;
+       struct scsi_cmd *cmd;
+
+retry:
+       ret = read(info->command_fd[0], &command, sizeof(command));
+       if (ret < 0) {
+               eprintf("ack pthread will be dead, %m\n");
+               if (errno == EAGAIN || errno == EINTR)
+                       goto retry;
+
+               goto out;
+       }
+
+       pthread_mutex_lock(&info->finished_lock);
+retest:
+       if (list_empty(&info->finished_list)) {
+               pthread_cond_wait(&info->finished_cond, &info->finished_lock);
+               goto retest;
+       }
+
+       while (!list_empty(&info->finished_list)) {
+               cmd = list_entry(info->finished_list.next,
+                                struct scsi_cmd, bs_list);
+
+               dprintf("found %p\n", cmd);
+
+               list_del(&cmd->bs_list);
+               list_add(&cmd->bs_list, &info->ack_list);
+       }
+
+       pthread_mutex_unlock(&info->finished_lock);
+
+       nr = 1;
+rewrite:
+       ret = write(info->done_fd[1], &nr, sizeof(nr));
+       if (ret < 0) {
+               eprintf("can't ack tgtd, %m\n");
+               if (errno == EAGAIN || errno == EINTR)
+                       goto rewrite;
+
+               goto out;
+       }
+
+       goto retry;
+out:
+       return NULL;
+}
+
+#define pgcnt(size, offset)    ((((size) + ((offset) & (pagesize - 1))) + 
(pagesize - 1)) >> pageshift)
+
+static void *bs_mmap_worker_fn(void *arg)
+{
+       struct bs_mmap_info *info = arg;
+       struct scsi_cmd *cmd;
+       int fd, ret = 0;
+       void *p;
+
+       while (1) {
+               pthread_mutex_lock(&info->pending_lock);
+retest:
+               if (list_empty(&info->pending_list)) {
+                       pthread_cond_wait(&info->pending_cond,
+                                         &info->pending_lock);
+                       if (info->stop) {
+                               pthread_mutex_unlock(&info->pending_lock);
+                               break;
+                       }
+                       goto retest;
+               }
+
+               cmd = list_entry(info->pending_list.next,
+                                struct scsi_cmd, bs_list);
+
+               dprintf("got %p\n", cmd);
+
+               list_del(&cmd->bs_list);
+               pthread_mutex_unlock(&info->pending_lock);
+
+               fd = cmd->dev->fd;
+
+               if (cmd->scb[0] == SYNCHRONIZE_CACHE ||
+                   cmd->scb[0] == SYNCHRONIZE_CACHE_16)
+                       ret = fsync(fd);
+               else if (cmd->uaddr)
+                       cmd->uaddr += cmd->offset;
+               else {
+                       p = mmap64(NULL,
+                                  pgcnt(cmd->len,
+                                     cmd->offset) << pageshift,
+                                     PROT_READ | PROT_WRITE,
+                                     MAP_SHARED, fd,
+                                     cmd->offset & ~((1ULL << pageshift) - 1));
+
+                       cmd->uaddr = (unsigned long) p +
+                                     (cmd->offset & (pagesize - 1));
+                       if (p == MAP_FAILED) {
+                               ret = -EINVAL;
+                               eprintf("%" PRIx64 " %u %" PRIu64 "\n",
+                                       cmd->uaddr, cmd->len, cmd->offset);
+                       } else {
+                               cmd->mmapped = 1;
+                               ret = 0;
+                       }
+               }
+
+               dprintf("%" PRIx64 " %u %" PRIu64 "\n", cmd->uaddr, cmd->len, 
cmd->offset);
+
+               if (ret) {
+                       cmd->rw = READ;
+                       cmd->offset = 0;
+                       cmd->len = 0;
+                       cmd->result = SAM_STAT_CHECK_CONDITION;
+                       sense_data_build(cmd, HARDWARE_ERROR,
+                                        ASC_INTERNAL_TGT_FAILURE);
+               } else {
+                       cmd->result = SAM_STAT_GOOD;
+               }
+
+               pthread_mutex_lock(&info->finished_lock);
+               list_add(&cmd->bs_list, &info->finished_list);
+               pthread_mutex_unlock(&info->finished_lock);
+
+               pthread_cond_signal(&info->finished_cond);
+       }
+
+       return NULL;
+}
+
+static void bs_mmap_handler (int fd, int events, void *data)
+{
+       struct bs_mmap_info *info = data;
+       struct scsi_cmd *cmd;
+       int nr_events, ret;
+       ret = read(info->done_fd[0], &nr_events, sizeof (nr_events));
+       if (ret < 0)
+       {
+               eprintf("wrong wakeup\n");
+               return;
+       }
+
+       while (!list_empty(&info->ack_list))
+       {
+               cmd = list_entry(info->ack_list.next,
+                                struct scsi_cmd, bs_list);
+
+               dprintf("back to tgtd, %p\n", cmd);
+
+               list_del(&cmd->bs_list);
+               target_cmd_io_done(cmd, cmd->result);
+       }
+
+       write(info->command_fd[1], &nr_events, sizeof (nr_events));
+}
+
 static int bs_mmap_open(struct scsi_lu *lu, char *path, int *fd, uint64_t 
*size)
 {
+       int i, ret;
+       struct bs_mmap_info *info =
+               (struct bs_mmap_info *) ((char *)lu + sizeof(*lu));
+
+       INIT_LIST_HEAD(&info->ack_list);
+       INIT_LIST_HEAD(&info->finished_list);
+       INIT_LIST_HEAD(&info->pending_list);
+
        *fd = backed_file_open(path, O_RDWR| O_LARGEFILE, size);
+       if (*fd < 0)
+               return *fd;
+
+       pthread_cond_init(&info->finished_cond, NULL);
+       pthread_cond_init(&info->pending_cond, NULL);
 
-       return *fd >= 0 ? 0 : *fd;
+       pthread_mutex_init(&info->finished_lock, NULL);
+       pthread_mutex_init(&info->pending_lock, NULL);
+
+       ret = pipe(info->command_fd);
+       if (ret)
+               goto close_dev_fd;
+
+       ret = pipe(info->done_fd);
+       if (ret)
+               goto close_command_fd;
+
+       ret = tgt_event_add(info->done_fd[0], EPOLLIN, bs_mmap_handler, info);
+       if (ret)
+               goto close_done_fd;
+
+       ret = pthread_create(&info->ack_thread, NULL, bs_mmap_ack_fn, info);
+       if (ret)
+               goto event_del;
+
+       for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++) {
+               ret = pthread_create(&info->worker_thread[i], NULL,
+                                    bs_mmap_worker_fn, info);
+       }
+
+       write(info->command_fd[1], &ret, sizeof(ret));
+
+       return 0;
+event_del:
+       tgt_event_del(info->done_fd[0]);
+close_done_fd:
+       close(info->done_fd[0]);
+       close(info->done_fd[1]);
+close_command_fd:
+       close(info->command_fd[0]);
+       close(info->command_fd[1]);
+close_dev_fd:
+       close(*fd);
+       pthread_cond_destroy(&info->finished_cond);
+       pthread_cond_destroy(&info->pending_cond);
+       pthread_mutex_destroy(&info->finished_lock);
+       pthread_mutex_destroy(&info->pending_lock);
+
+       return -1;
 }
 
 static void bs_mmap_close(struct scsi_lu *lu)
 {
+       int i;
+       struct bs_mmap_info *info =
+               (struct bs_mmap_info *) ((char *)lu + sizeof(*lu));
+
+       pthread_cancel(info->ack_thread);
+       pthread_join(info->ack_thread, NULL);
+
+       info->stop = 1;
+       pthread_cond_broadcast(&info->pending_cond);
+
+       for (i = 0; i < ARRAY_SIZE(info->worker_thread); i++)
+               pthread_join(info->worker_thread[i], NULL);
+
+       pthread_cond_destroy(&info->finished_cond);
+       pthread_cond_destroy(&info->pending_cond);
+
+       pthread_mutex_destroy(&info->finished_lock);
+       pthread_mutex_destroy(&info->pending_lock);
+
        close(lu->fd);
 }
 
-#define pgcnt(size, offset)    ((((size) + ((offset) & (pagesize - 1))) + 
(pagesize - 1)) >> pageshift)
-
 static int bs_mmap_cmd_submit(struct scsi_cmd *cmd)
 {
-       int fd = cmd->dev->fd, ret = 0;
-       void *p;
+       struct scsi_lu *lu = cmd->dev;
+       struct bs_mmap_info *info =
+               (struct bs_mmap_info *)((char *)lu + sizeof(*lu));
 
-       if (cmd->scb[0] == SYNCHRONIZE_CACHE ||
-           cmd->scb[0] == SYNCHRONIZE_CACHE_16)
-               return fsync(fd);
-
-       if (cmd->uaddr)
-               cmd->uaddr += cmd->offset;
-       else {
-               p = mmap64(NULL, pgcnt(cmd->len, cmd->offset) << pageshift,
-                          PROT_READ | PROT_WRITE, MAP_SHARED, fd,
-                          cmd->offset & ~((1ULL << pageshift) - 1));
-
-               cmd->uaddr = (unsigned long) p + (cmd->offset & (pagesize - 1));
-               if (p == MAP_FAILED) {
-                       ret = -EINVAL;
-                       eprintf("%" PRIx64 " %u %" PRIu64 "\n", cmd->uaddr,
-                               cmd->len, cmd->offset);
-               }
-       }
+       dprintf("%d %d %u %"  PRIx64 " %" PRIx64 " %p\n", lu->fd, cmd->rw,
+               cmd->len, cmd->uaddr, cmd->offset, cmd);
+
+       pthread_mutex_lock(&info->pending_lock);
+
+       list_add(&cmd->bs_list, &info->pending_list);
+
+       pthread_mutex_unlock(&info->pending_lock);
+       pthread_cond_signal(&info->pending_cond);
 
-       dprintf("%" PRIx64 " %u %" PRIu64 "\n", cmd->uaddr, cmd->len, 
cmd->offset);
+       cmd->async = 1;
 
-       return ret;
+       return 0;
 }
 
 static int bs_mmap_cmd_done(struct scsi_cmd *cmd)
@@ -97,6 +347,7 @@ static int bs_mmap_cmd_done(struct scsi_
 }
 
 struct backingstore_template mmap_bst = {
+       .bs_datasize            = sizeof(struct bs_mmap_info),
        .bs_open                = bs_mmap_open,
        .bs_close               = bs_mmap_close,
        .bs_cmd_submit          = bs_mmap_cmd_submit,
Index: b/usr/mmc.c
===================================================================
--- a/usr/mmc.c
+++ b/usr/mmc.c
@@ -58,7 +58,6 @@ static int mmc_rw(int host_no, struct sc
                sense_data_build(cmd, ILLEGAL_REQUEST, ASC_LUN_NOT_SUPPORTED);
                return SAM_STAT_CHECK_CONDITION;
        } else {
-               cmd->mmapped = 1;
                return SAM_STAT_GOOD;
        }
        return 0;
Index: b/usr/sbc.c
===================================================================
--- a/usr/sbc.c
+++ b/usr/sbc.c
@@ -70,7 +70,6 @@ static int sbc_rw(int host_no, struct sc
                key = HARDWARE_ERROR;
                asc = ASC_INTERNAL_TGT_FAILURE;
        } else {
-               cmd->mmapped = 1;
                return SAM_STAT_GOOD;
        }
 
_______________________________________________
Stgt-devel mailing list
[email protected]
https://lists.berlios.de/mailman/listinfo/stgt-devel

Reply via email to