On Wed, May 07, 2025 at 03:49:40PM -0600, Uday Shankar wrote:
> Towards the goal of decoupling ublk_queues from ublk server threads,
> move resources/data that should be per-thread rather than per-queue out
> of ublk_queue and into a new struct ublk_thread.
> 
> Signed-off-by: Uday Shankar <ushan...@purestorage.com>
> ---
>  tools/testing/selftests/ublk/kublk.c | 225 
> ++++++++++++++++++-----------------
>  tools/testing/selftests/ublk/kublk.h |  38 ++++--
>  2 files changed, 145 insertions(+), 118 deletions(-)
> 
> diff --git a/tools/testing/selftests/ublk/kublk.c 
> b/tools/testing/selftests/ublk/kublk.c
> index 
> 3ad9e162816c3a10e9928f9d530908cda7595530..313689f94cd6361a9a0f4b9257085b2a62bc8b8c
>  100644
> --- a/tools/testing/selftests/ublk/kublk.c
> +++ b/tools/testing/selftests/ublk/kublk.c
> @@ -324,8 +324,8 @@ static void ublk_ctrl_dump(struct ublk_dev *dev)
>  
>               for (i = 0; i < info->nr_hw_queues; i++) {
>                       ublk_print_cpu_set(&affinity[i], buf, sizeof(buf));
> -                     printf("\tqueue %u: tid %d affinity(%s)\n",
> -                                     i, dev->q[i].tid, buf);
> +                     printf("\tqueue %u: affinity(%s)\n",
> +                                     i, buf);
>               }
>               free(affinity);
>       }
> @@ -395,18 +395,16 @@ static void ublk_queue_deinit(struct ublk_queue *q)
>               free(q->ios[i].buf_addr);
>  }
>  
> -static void ublk_thread_deinit(struct ublk_queue *q)
> +static void ublk_thread_deinit(struct ublk_thread *t)
>  {
> -     q->tid = 0;
> +     io_uring_unregister_buffers(&t->ring);
>  
> -     io_uring_unregister_buffers(&q->ring);
> +     io_uring_unregister_ring_fd(&t->ring);
>  
> -     io_uring_unregister_ring_fd(&q->ring);
> -
> -     if (q->ring.ring_fd > 0) {
> -             io_uring_unregister_files(&q->ring);
> -             close(q->ring.ring_fd);
> -             q->ring.ring_fd = -1;
> +     if (t->ring.ring_fd > 0) {
> +             io_uring_unregister_files(&t->ring);
> +             close(t->ring.ring_fd);
> +             t->ring.ring_fd = -1;
>       }
>  }
>  
> @@ -421,7 +419,6 @@ static int ublk_queue_init(struct ublk_queue *q)
>       q->tgt_ops = dev->tgt.ops;
>       q->state = 0;
>       q->q_depth = depth;
> -     q->cmd_inflight = 0;
>  
>       if (dev->dev_info.flags & UBLK_F_SUPPORT_ZERO_COPY) {
>               q->state |= UBLKSRV_NO_BUF;
> @@ -443,6 +440,7 @@ static int ublk_queue_init(struct ublk_queue *q)
>               q->ios[i].buf_addr = NULL;
>               q->ios[i].flags = UBLKSRV_NEED_FETCH_RQ | UBLKSRV_IO_FREE;
>               q->ios[i].q = q;
> +             q->ios[i].tag = i;
>  
>               if (q->state & UBLKSRV_NO_BUF)
>                       continue;
> @@ -463,47 +461,46 @@ static int ublk_queue_init(struct ublk_queue *q)
>       return -ENOMEM;
>  }
>  
> -static int ublk_thread_init(struct ublk_queue *q)
> +static int ublk_thread_init(struct ublk_thread *t)
>  {
> -     struct ublk_dev *dev = q->dev;
> +     struct ublk_dev *dev = t->dev;
>       int ring_depth = dev->tgt.sq_depth, cq_depth = dev->tgt.cq_depth;
>       int ret;
>  
> -     q->tid = gettid();
> -
> -     ret = ublk_setup_ring(&q->ring, ring_depth, cq_depth,
> +     ret = ublk_setup_ring(&t->ring, ring_depth, cq_depth,
>                       IORING_SETUP_COOP_TASKRUN |
>                       IORING_SETUP_SINGLE_ISSUER |
>                       IORING_SETUP_DEFER_TASKRUN);
>       if (ret < 0) {
> -             ublk_err("ublk dev %d queue %d setup io_uring failed %d\n",
> -                             q->dev->dev_info.dev_id, q->q_id, ret);
> +             ublk_err("ublk dev %d thread %d setup io_uring failed %d\n",
> +                             dev->dev_info.dev_id, t->idx, ret);
>               goto fail;
>       }
>  
>       if (dev->dev_info.flags & UBLK_F_SUPPORT_ZERO_COPY) {
> -             ret = io_uring_register_buffers_sparse(&q->ring, q->q_depth);
> +             ret = io_uring_register_buffers_sparse(
> +                     &t->ring, dev->dev_info.queue_depth);
>               if (ret) {
> -                     ublk_err("ublk dev %d queue %d register spare buffers 
> failed %d",
> -                                     dev->dev_info.dev_id, q->q_id, ret);
> +                     ublk_err("ublk dev %d thread %d register spare buffers 
> failed %d",
> +                                     dev->dev_info.dev_id, t->idx, ret);
>                       goto fail;
>               }
>       }
>  
> -     io_uring_register_ring_fd(&q->ring);
> +     io_uring_register_ring_fd(&t->ring);
>  
> -     ret = io_uring_register_files(&q->ring, dev->fds, dev->nr_fds);
> +     ret = io_uring_register_files(&t->ring, dev->fds, dev->nr_fds);
>       if (ret) {
> -             ublk_err("ublk dev %d queue %d register files failed %d\n",
> -                             q->dev->dev_info.dev_id, q->q_id, ret);
> +             ublk_err("ublk dev %d thread %d register files failed %d\n",
> +                             t->dev->dev_info.dev_id, t->idx, ret);
>               goto fail;
>       }
>  
>       return 0;
>  fail:
> -     ublk_thread_deinit(q);
> -     ublk_err("ublk dev %d queue %d thread init failed\n",
> -                     dev->dev_info.dev_id, q->q_id);
> +     ublk_thread_deinit(t);
> +     ublk_err("ublk dev %d thread %d init failed\n",
> +                     dev->dev_info.dev_id, t->idx);
>       return -ENOMEM;
>  }
>  
> @@ -545,8 +542,9 @@ static void ublk_dev_unprep(struct ublk_dev *dev)
>       close(dev->fds[0]);
>  }
>  
> -int ublk_queue_io_cmd(struct ublk_queue *q, struct ublk_io *io, unsigned tag)
> +int ublk_queue_io_cmd(struct ublk_io *io)
>  {
> +     struct ublk_thread *t = io->t;
>       struct ublksrv_io_cmd *cmd;
>       struct io_uring_sqe *sqe[1];
>       unsigned int cmd_op = 0;
> @@ -571,13 +569,13 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct 
> ublk_io *io, unsigned tag)
>       else if (io->flags & UBLKSRV_NEED_FETCH_RQ)
>               cmd_op = UBLK_U_IO_FETCH_REQ;
>  
> -     if (io_uring_sq_space_left(&q->ring) < 1)
> -             io_uring_submit(&q->ring);
> +     if (io_uring_sq_space_left(&t->ring) < 1)
> +             io_uring_submit(&t->ring);
>  
> -     ublk_io_alloc_sqes(ublk_get_io(q, tag), sqe, 1);
> +     ublk_io_alloc_sqes(io, sqe, 1);
>       if (!sqe[0]) {
> -             ublk_err("%s: run out of sqe %d, tag %d\n",
> -                             __func__, q->q_id, tag);
> +             ublk_err("%s: run out of sqe. thread %u, tag %d\n",
> +                             __func__, t->idx, io->tag);
>               return -1;
>       }
>  
> @@ -592,42 +590,51 @@ int ublk_queue_io_cmd(struct ublk_queue *q, struct 
> ublk_io *io, unsigned tag)
>       sqe[0]->opcode  = IORING_OP_URING_CMD;
>       sqe[0]->flags   = IOSQE_FIXED_FILE;
>       sqe[0]->rw_flags        = 0;
> -     cmd->tag        = tag;
> -     cmd->q_id       = q->q_id;
> -     if (!(q->state & UBLKSRV_NO_BUF))
> +     cmd->tag        = io->tag;
> +     cmd->q_id       = io->q->q_id;
> +     if (!(io->q->state & UBLKSRV_NO_BUF))
>               cmd->addr       = (__u64) (uintptr_t) io->buf_addr;
>       else
>               cmd->addr       = 0;
>  
> -     user_data = build_user_data(tag, _IOC_NR(cmd_op), 0, q->q_id, 0);
> +     user_data = build_user_data(io->tag, _IOC_NR(cmd_op), 0, io->q->q_id, 
> 0);
>       io_uring_sqe_set_data64(sqe[0], user_data);
>  
>       io->flags = 0;
>  
> -     q->cmd_inflight += 1;
> +     t->cmd_inflight += 1;
>  
> -     ublk_dbg(UBLK_DBG_IO_CMD, "%s: (qid %d tag %u cmd_op %u) iof %x 
> stopping %d\n",
> -                     __func__, q->q_id, tag, cmd_op,
> -                     io->flags, !!(q->state & UBLKSRV_QUEUE_STOPPING));
> +     ublk_dbg(UBLK_DBG_IO_CMD, "%s: (thread %u qid %d tag %u cmd_op %u) iof 
> %x stopping %d\n",
> +                     __func__, t->idx, io->q->q_id, io->tag, cmd_op,
> +                     io->flags, !!(t->state & UBLKSRV_THREAD_STOPPING));
>       return 1;
>  }
>  
> -static void ublk_submit_fetch_commands(struct ublk_queue *q)
> +static void ublk_submit_fetch_commands(struct ublk_thread *t)
>  {
> +     /*
> +      * Service exclusively the queue whose q_id matches our thread
> +      * index. This may change in the future.
> +      */
> +     struct ublk_queue *q = &t->dev->q[t->idx];
> +     struct ublk_io *io;
>       int i = 0;
>  
> -     for (i = 0; i < q->q_depth; i++)
> -             ublk_queue_io_cmd(q, &q->ios[i], i);
> +     for (i = 0; i < q->q_depth; i++) {
> +             io = &q->ios[i];
> +             io->t = t;
> +             ublk_queue_io_cmd(io);
> +     }
>  }
>  
> -static int ublk_queue_is_idle(struct ublk_queue *q)
> +static int ublk_thread_is_idle(struct ublk_thread *t)
>  {
> -     return !io_uring_sq_ready(&q->ring) && !q->io_inflight;
> +     return !io_uring_sq_ready(&t->ring) && !t->io_inflight;
>  }
>  
> -static int ublk_queue_is_done(struct ublk_queue *q)
> +static int ublk_thread_is_done(struct ublk_thread *t)
>  {
> -     return (q->state & UBLKSRV_QUEUE_STOPPING) && ublk_queue_is_idle(q);
> +     return (t->state & UBLKSRV_THREAD_STOPPING) && ublk_thread_is_idle(t);
>  }
>  
>  static inline void ublksrv_handle_tgt_cqe(struct ublk_queue *q,
> @@ -645,15 +652,16 @@ static inline void ublksrv_handle_tgt_cqe(struct 
> ublk_queue *q,
>               q->tgt_ops->tgt_io_done(q, tag, cqe);
>  }
>  
> -static void ublk_handle_cqe(struct ublk_dev *dev,
> +static void ublk_handle_cqe(struct ublk_thread *t,
>               struct io_uring_cqe *cqe, void *data)
>  {
> +     struct ublk_dev *dev = t->dev;
>       unsigned q_id = user_data_to_q_id(cqe->user_data);
>       struct ublk_queue *q = &dev->q[q_id];
>       unsigned tag = user_data_to_tag(cqe->user_data);
>       unsigned cmd_op = user_data_to_op(cqe->user_data);
>       int fetch = (cqe->res != UBLK_IO_RES_ABORT) &&
> -             !(q->state & UBLKSRV_QUEUE_STOPPING);
> +             !(t->state & UBLKSRV_THREAD_STOPPING);
>       struct ublk_io *io;
>  
>       if (cqe->res < 0 && cqe->res != -ENODEV)
> @@ -664,7 +672,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
>                       __func__, cqe->res, q->q_id, tag, cmd_op,
>                       is_target_io(cqe->user_data),
>                       user_data_to_tgt_data(cqe->user_data),
> -                     (q->state & UBLKSRV_QUEUE_STOPPING));
> +                     (t->state & UBLKSRV_THREAD_STOPPING));
>  
>       /* Don't retrieve io in case of target io */
>       if (is_target_io(cqe->user_data)) {
> @@ -673,10 +681,10 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
>       }
>  
>       io = &q->ios[tag];
> -     q->cmd_inflight--;
> +     t->cmd_inflight--;
>  
>       if (!fetch) {
> -             q->state |= UBLKSRV_QUEUE_STOPPING;
> +             t->state |= UBLKSRV_THREAD_STOPPING;
>               io->flags &= ~UBLKSRV_NEED_FETCH_RQ;
>       }
>  
> @@ -686,7 +694,7 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
>                       q->tgt_ops->queue_io(q, tag);
>       } else if (cqe->res == UBLK_IO_RES_NEED_GET_DATA) {
>               io->flags |= UBLKSRV_NEED_GET_DATA | UBLKSRV_IO_FREE;
> -             ublk_queue_io_cmd(q, io, tag);
> +             ublk_queue_io_cmd(io);
>       } else {
>               /*
>                * COMMIT_REQ will be completed immediately since no fetching
> @@ -700,87 +708,92 @@ static void ublk_handle_cqe(struct ublk_dev *dev,
>       }
>  }
>  
> -static int ublk_reap_events_uring(struct ublk_queue *q)
> +static int ublk_reap_events_uring(struct ublk_thread *t)
>  {
>       struct io_uring_cqe *cqe;
>       unsigned head;
>       int count = 0;
>  
> -     io_uring_for_each_cqe(&q->ring, head, cqe) {
> -             ublk_handle_cqe(q->dev, cqe, NULL);
> +     io_uring_for_each_cqe(&t->ring, head, cqe) {
> +             ublk_handle_cqe(t, cqe, NULL);
>               count += 1;
>       }
> -     io_uring_cq_advance(&q->ring, count);
> +     io_uring_cq_advance(&t->ring, count);
>  
>       return count;
>  }
>  
> -static int ublk_process_io(struct ublk_queue *q)
> +static int ublk_process_io(struct ublk_thread *t)
>  {
>       int ret, reapped;
>  
> -     ublk_dbg(UBLK_DBG_QUEUE, "dev%d-q%d: to_submit %d inflight cmd %u 
> stopping %d\n",
> -                             q->dev->dev_info.dev_id,
> -                             q->q_id, io_uring_sq_ready(&q->ring),
> -                             q->cmd_inflight,
> -                             (q->state & UBLKSRV_QUEUE_STOPPING));
> +     ublk_dbg(UBLK_DBG_THREAD, "dev%d-t%u: to_submit %d inflight cmd %u 
> stopping %d\n",
> +                             t->dev->dev_info.dev_id,
> +                             t->idx, io_uring_sq_ready(&t->ring),
> +                             t->cmd_inflight,
> +                             (t->state & UBLKSRV_THREAD_STOPPING));
>  
> -     if (ublk_queue_is_done(q))
> +     if (ublk_thread_is_done(t))
>               return -ENODEV;
>  
> -     ret = io_uring_submit_and_wait(&q->ring, 1);
> -     reapped = ublk_reap_events_uring(q);
> +     ret = io_uring_submit_and_wait(&t->ring, 1);
> +     reapped = ublk_reap_events_uring(t);
>  
> -     ublk_dbg(UBLK_DBG_QUEUE, "submit result %d, reapped %d stop %d idle 
> %d\n",
> -                     ret, reapped, (q->state & UBLKSRV_QUEUE_STOPPING),
> -                     (q->state & UBLKSRV_QUEUE_IDLE));
> +     ublk_dbg(UBLK_DBG_THREAD, "submit result %d, reapped %d stop %d idle 
> %d\n",
> +                     ret, reapped, (t->state & UBLKSRV_THREAD_STOPPING),
> +                     (t->state & UBLKSRV_THREAD_IDLE));
>  
>       return reapped;
>  }
>  
> -static void ublk_queue_set_sched_affinity(const struct ublk_queue *q,
> +static void ublk_thread_set_sched_affinity(const struct ublk_thread *t,
>               cpu_set_t *cpuset)
>  {
>          if (sched_setaffinity(0, sizeof(*cpuset), cpuset) < 0)
> -                ublk_err("ublk dev %u queue %u set affinity failed",
> -                                q->dev->dev_info.dev_id, q->q_id);
> +             ublk_err("ublk dev %u thread %u set affinity failed",
> +                             t->dev->dev_info.dev_id, t->idx);
>  }
>  
> -struct ublk_queue_info {
> -     struct ublk_queue       *q;
> -     sem_t                   *queue_sem;
> +struct ublk_thread_info {
> +     struct ublk_dev         *dev;
> +     unsigned                idx;
> +     sem_t                   *ready;
>       cpu_set_t               *affinity;
>  };
>  
>  static void *ublk_io_handler_fn(void *data)
>  {
> -     struct ublk_queue_info *info = data;
> -     struct ublk_queue *q = info->q;
> -     int dev_id = q->dev->dev_info.dev_id;
> +     struct ublk_thread_info *info = data;
> +     struct ublk_thread *t = &info->dev->threads[info->idx];
> +     int dev_id = info->dev->dev_info.dev_id;
>       int ret;
>  
> -     ret = ublk_thread_init(q);
> +     t->dev = info->dev;
> +     t->idx = info->idx;
> +
> +     ret = ublk_thread_init(t);
>       if (ret) {
> -             ublk_err("ublk dev %d queue %d thread init failed\n",
> -                             dev_id, q->q_id);
> +             ublk_err("ublk dev %d thread %u init failed\n",
> +                             dev_id, t->idx);
>               return NULL;
>       }
>       /* IO perf is sensitive with queue pthread affinity on NUMA machine*/
> -     ublk_queue_set_sched_affinity(q, info->affinity);
> -     sem_post(info->queue_sem);
> +     ublk_thread_set_sched_affinity(t, info->affinity);
> +     sem_post(info->ready);
>  
> -     ublk_dbg(UBLK_DBG_QUEUE, "tid %d: ublk dev %d queue %d started\n",
> -                     q->tid, dev_id, q->q_id);
> +     ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %u started\n",
> +                     gettid(), dev_id, t->idx);
>  
>       /* submit all io commands to ublk driver */
> -     ublk_submit_fetch_commands(q);
> +     ublk_submit_fetch_commands(t);
>       do {
> -             if (ublk_process_io(q) < 0)
> +             if (ublk_process_io(t) < 0)
>                       break;
>       } while (1);
>  
> -     ublk_dbg(UBLK_DBG_QUEUE, "ublk dev %d queue %d exited\n", dev_id, 
> q->q_id);
> -     ublk_thread_deinit(q);
> +     ublk_dbg(UBLK_DBG_THREAD, "tid %d: ublk dev %d thread %d exiting\n",
> +              gettid(), dev_id, t->idx);
> +     ublk_thread_deinit(t);
>       return NULL;
>  }
>  
> @@ -823,20 +836,19 @@ static int ublk_send_dev_event(const struct dev_ctx 
> *ctx, struct ublk_dev *dev,
>  static int ublk_start_daemon(const struct dev_ctx *ctx, struct ublk_dev *dev)
>  {
>       const struct ublksrv_ctrl_dev_info *dinfo = &dev->dev_info;
> -     struct ublk_queue_info *qinfo;
> +     struct ublk_thread_info *tinfo;
>       cpu_set_t *affinity_buf;
>       void *thread_ret;
> -     sem_t queue_sem;
> +     sem_t ready;
>       int ret, i;
>  
>       ublk_dbg(UBLK_DBG_DEV, "%s enter\n", __func__);
>  
> -     qinfo = (struct ublk_queue_info *)calloc(sizeof(struct ublk_queue_info),
> -                     dinfo->nr_hw_queues);
> -     if (!qinfo)
> +     tinfo = calloc(sizeof(struct ublk_thread_info), dinfo->nr_hw_queues);
> +     if (!tinfo)
>               return -ENOMEM;
>  
> -     sem_init(&queue_sem, 0, 0);
> +     sem_init(&ready, 0, 0);
>       ret = ublk_dev_prep(ctx, dev);
>       if (ret)
>               return ret;
> @@ -856,17 +868,18 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, 
> struct ublk_dev *dev)
>                       goto fail;
>               }
>  
> -             qinfo[i].q = &dev->q[i];
> -             qinfo[i].queue_sem = &queue_sem;
> -             qinfo[i].affinity = &affinity_buf[i];
> -             pthread_create(&dev->q[i].thread, NULL,
> +             tinfo[i].dev = dev;
> +             tinfo[i].idx = i;
> +             tinfo[i].ready = &ready;
> +             tinfo[i].affinity = &affinity_buf[i];
> +             pthread_create(&dev->threads[i].thread, NULL,
>                               ublk_io_handler_fn,
> -                             &qinfo[i]);
> +                             &tinfo[i]);
>       }
>  
>       for (i = 0; i < dinfo->nr_hw_queues; i++)
> -             sem_wait(&queue_sem);
> -     free(qinfo);
> +             sem_wait(&ready);
> +     free(tinfo);
>       free(affinity_buf);
>  
>       /* everything is fine now, start us */
> @@ -889,7 +902,7 @@ static int ublk_start_daemon(const struct dev_ctx *ctx, 
> struct ublk_dev *dev)
>  
>       /* wait until we are terminated */
>       for (i = 0; i < dinfo->nr_hw_queues; i++)
> -             pthread_join(dev->q[i].thread, &thread_ret);
> +             pthread_join(dev->threads[i].thread, &thread_ret);
>   fail:
>       for (i = 0; i < dinfo->nr_hw_queues; i++)
>               ublk_queue_deinit(&dev->q[i]);
> diff --git a/tools/testing/selftests/ublk/kublk.h 
> b/tools/testing/selftests/ublk/kublk.h
> index 
> 7c912116606429215af7dbc2a8ce6b40ef89bfbd..9eb2207fcebe96d34488d057c881db262b9767b3
>  100644
> --- a/tools/testing/selftests/ublk/kublk.h
> +++ b/tools/testing/selftests/ublk/kublk.h
> @@ -51,10 +51,12 @@
>  #define UBLK_IO_MAX_BYTES               (1 << 20)
>  #define UBLK_MAX_QUEUES_SHIFT                5
>  #define UBLK_MAX_QUEUES                 (1 << UBLK_MAX_QUEUES_SHIFT)
> +#define UBLK_MAX_THREADS_SHIFT               5
> +#define UBLK_MAX_THREADS             (1 << UBLK_MAX_THREADS_SHIFT)
>  #define UBLK_QUEUE_DEPTH                1024
>  
>  #define UBLK_DBG_DEV            (1U << 0)
> -#define UBLK_DBG_QUEUE          (1U << 1)
> +#define UBLK_DBG_THREAD         (1U << 1)
>  #define UBLK_DBG_IO_CMD         (1U << 2)
>  #define UBLK_DBG_IO             (1U << 3)
>  #define UBLK_DBG_CTRL_CMD       (1U << 4)
> @@ -62,6 +64,7 @@
>  
>  struct ublk_dev;
>  struct ublk_queue;
> +struct ublk_thread;
>  
>  struct stripe_ctx {
>       /* stripe */
> @@ -120,6 +123,8 @@ struct ublk_io {
>       unsigned short refs;            /* used by target code only */
>  
>       struct ublk_queue *q;
> +     struct ublk_thread *t;

Given you have to take static mapping between queue/tag and thread,
'struct ublk_thread' should have been figured out runtime easily,
then we can save 8 bytes, also avoid memory indirect dereference.

sizeof(struct ublk_io) need to be held in single L1 cacheline.

But it can be one followup.

Reviewed-by: Ming Lei <ming....@redhat.com>


thanks,
Ming


Reply via email to