On Wed, May 28, 2025 at 03:09:14PM -0400, Stefan Hajnoczi wrote: > Introduce the aio_add_sqe() API for submitting io_uring requests in the > current AioContext. This allows other components in QEMU, like the block > layer, to take advantage of io_uring features without creating their own > io_uring context. > > This API supports nested event loops just like file descriptor > monitoring and BHs do. This comes at a complexity cost: a BH is required > to dispatch CQE callbacks and they are placed on a list so that a nested > event loop can invoke its parent's pending CQE callbacks. If you're > wondering why CqeHandler exists instead of just a callback function > pointer, this is why. > > Signed-off-by: Stefan Hajnoczi <stefa...@redhat.com> > ---
Large patch. I found a couple of nits, but the overall design looks sound. Reviewed-by: Eric Blake <ebl...@redhat.com> > include/block/aio.h | 82 ++++++++++++++++++++++++ > util/aio-posix.h | 1 + > util/aio-posix.c | 9 +++ > util/fdmon-io_uring.c | 145 +++++++++++++++++++++++++++++++----------- > 4 files changed, 200 insertions(+), 37 deletions(-) > > diff --git a/include/block/aio.h b/include/block/aio.h > index d919d7c8f4..95beef28c3 100644 > --- a/include/block/aio.h > +++ b/include/block/aio.h > @@ -61,6 +61,27 @@ typedef struct LuringState LuringState; > /* Is polling disabled? */ > bool aio_poll_disabled(AioContext *ctx); > > +#ifdef CONFIG_LINUX_IO_URING > +/* > + * Each io_uring request must have a unique CqeHandler that processes the > cqe. > + * The lifetime of a CqeHandler must be at least from aio_add_sqe() until > + * ->cb() invocation. > + */ > +typedef struct CqeHandler CqeHandler; > +struct CqeHandler { > + /* Called by the AioContext when the request has completed */ > + void (*cb)(CqeHandler *handler); I see an opaque callback pointer in prep_cqe below, but not one here. Is that because callers can write their own struct that includes a CqeHandler as its first member, if more state is needed? > + > + /* Used internally, do not access this */ > + QSIMPLEQ_ENTRY(CqeHandler) next; > + > + /* This field is filled in before ->cb() is called */ > + struct io_uring_cqe cqe; > +}; > + > +typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ; > +#endif /* CONFIG_LINUX_IO_URING */ > + > /* Callbacks for file descriptor monitoring implementations */ > typedef struct { > /* > @@ -138,6 +159,27 @@ typedef struct { > * Called with list_lock incremented. > */ > void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list); > + > +#ifdef CONFIG_LINUX_IO_URING > + /** > + * aio_add_sqe: Add an io_uring sqe for submission. > + * @prep_sqe: invoked with an sqe that should be prepared for submission > + * @opaque: user-defined argument to @prep_sqe() > + * @cqe_handler: the unique cqe handler associated with this request > + * > + * The caller's @prep_sqe() function is invoked to fill in the details of > + * the sqe. Do not call io_uring_sqe_set_data() on this sqe. > + * > + * The kernel may see the sqe as soon as @pre_sqe() returns or it may > take prep_sqe > + * until the next event loop iteration. > + * > + * This function is called from the current AioContext and is not > + * thread-safe. > + */ > + void (*add_sqe)(AioContext *ctx, > + void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), > + void *opaque, CqeHandler *cqe_handler); > +#endif /* CONFIG_LINUX_IO_URING */ > } FDMonOps; > > /* > @@ -255,6 +297,10 @@ struct AioContext { > struct io_uring fdmon_io_uring; > AioHandlerSList submit_list; > gpointer io_uring_fd_tag; > + > + /* Pending callback state for cqe handlers */ > + CqeHandlerSimpleQ cqe_handler_ready_list; > + QEMUBH *cqe_handler_bh; > #endif While here, is it worth adding a comment to state which matching #if it ends (similar to what you did above in FDMonOps add_sqe)? > > /* TimerLists for calling timers - one per clock type. Has its own > @@ -761,4 +807,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t > max_batch); > */ > void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min, > int64_t max, Error **errp); > + > +#ifdef CONFIG_LINUX_IO_URING > +/** > + * aio_has_io_uring: Return whether io_uring is available. > + * > + * io_uring is either available in all AioContexts or in none, so this only > + * needs to be called once from within any thread's AioContext. > + */ > +static inline bool aio_has_io_uring(void) > +{ > + AioContext *ctx = qemu_get_current_aio_context(); > + return ctx->fdmon_ops->add_sqe; > +} > + > +/** > + * aio_add_sqe: Add an io_uring sqe for submission. > + * @prep_sqe: invoked with an sqe that should be prepared for submission > + * @opaque: user-defined argument to @prep_sqe() > + * @cqe_handler: the unique cqe handler associated with this request > + * > + * The caller's @prep_sqe() function is invoked to fill in the details of the > + * sqe. Do not call io_uring_sqe_set_data() on this sqe. > + * > + * The sqe is submitted by the current AioContext. The kernel may see the sqe > + * as soon as @pre_sqe() returns or it may take until the next event loop prep_sqe > + * iteration. > + * > + * When the AioContext is destroyed, pending sqes are ignored and their > + * CqeHandlers are not invoked. > + * > + * This function must be called only when aio_has_io_uring() returns true. > + */ > +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), > + void *opaque, CqeHandler *cqe_handler); > +#endif /* CONFIG_LINUX_IO_URING */ > + > #endif > diff --git a/util/aio-posix.h b/util/aio-posix.h > index 6f9d97d866..57ef801a5f 100644 > --- a/util/aio-posix.h > +++ b/util/aio-posix.h > @@ -36,6 +36,7 @@ struct AioHandler { > #ifdef CONFIG_LINUX_IO_URING > QSLIST_ENTRY(AioHandler) node_submitted; > unsigned flags; /* see fdmon-io_uring.c */ > + CqeHandler cqe_handler; > #endif > int64_t poll_idle_timeout; /* when to stop userspace polling */ > bool poll_ready; /* has polling detected an event? */ > diff --git a/util/aio-posix.c b/util/aio-posix.c > index 44b3df61f9..89bb215a2f 100644 > --- a/util/aio-posix.c > +++ b/util/aio-posix.c > @@ -790,3 +790,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t > max_batch) > > aio_notify(ctx); > } > + > +#ifdef CONFIG_LINUX_IO_URING > +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), > + void *opaque, CqeHandler *cqe_handler) > +{ > + AioContext *ctx = qemu_get_current_aio_context(); > + ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler); > +} > +#endif /* CONFIG_LINUX_IO_URING */ > diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c > index ef1a866a03..3a49d6a20a 100644 > --- a/util/fdmon-io_uring.c > +++ b/util/fdmon-io_uring.c > @@ -75,8 +75,8 @@ static inline int pfd_events_from_poll(int poll_events) > } > > /* > - * Returns an sqe for submitting a request. Only be called within > - * fdmon_io_uring_wait(). > + * Returns an sqe for submitting a request. Only called from the AioContext > + * thread. > */ > static struct io_uring_sqe *get_sqe(AioContext *ctx) > { > @@ -166,23 +166,43 @@ static void fdmon_io_uring_update(AioContext *ctx, > } > } > > +static void fdmon_io_uring_add_sqe(AioContext *ctx, > + void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque), > + void *opaque, CqeHandler *cqe_handler) > +{ > + struct io_uring_sqe *sqe = get_sqe(ctx); > + > + prep_sqe(sqe, opaque); > + io_uring_sqe_set_data(sqe, cqe_handler); > +} > + > +static void fdmon_special_cqe_handler(CqeHandler *cqe_handler) > +{ > + /* > + * This is an empty function that is never called. It is used as a > function > + * pointer to distinguish it from ordinary cqe handlers. > + */ > +} > + > static void add_poll_multishot_sqe(AioContext *ctx, AioHandler *node) > { > struct io_uring_sqe *sqe = get_sqe(ctx); > int events = poll_events_from_pfd(node->pfd.events); > > io_uring_prep_poll_multishot(sqe, node->pfd.fd, events); > - io_uring_sqe_set_data(sqe, node); > + node->cqe_handler.cb = fdmon_special_cqe_handler; > + io_uring_sqe_set_data(sqe, &node->cqe_handler); > } > > static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node) > { > struct io_uring_sqe *sqe = get_sqe(ctx); > + CqeHandler *cqe_handler = &node->cqe_handler; > > #ifdef LIBURING_HAVE_DATA64 > - io_uring_prep_poll_remove(sqe, (uintptr_t)node); > + io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler); > #else > - io_uring_prep_poll_remove(sqe, node); > + io_uring_prep_poll_remove(sqe, cqe_handler); > #endif > io_uring_sqe_set_data(sqe, NULL); > } > @@ -221,20 +241,12 @@ static void fill_sq_ring(AioContext *ctx) > } > } > > -/* Returns true if a handler became ready */ > -static bool process_cqe(AioContext *ctx, > - AioHandlerList *ready_list, > - struct io_uring_cqe *cqe) > +static bool process_cqe_aio_handler(AioContext *ctx, > + AioHandlerList *ready_list, > + AioHandler *node, > + struct io_uring_cqe *cqe) > { > - AioHandler *node = io_uring_cqe_get_data(cqe); > - unsigned flags; > - > - /* poll_timeout and poll_remove have a zero user_data field */ > - if (!node) { > - return false; > - } > - > - flags = qatomic_read(&node->flags); > + unsigned flags = qatomic_read(&node->flags); > > /* > * poll_multishot cancelled by poll_remove? Or completed early because fd > @@ -261,6 +273,56 @@ static bool process_cqe(AioContext *ctx, > return true; > } > > +/* Process CqeHandlers from the ready list */ > +static void cqe_handler_bh(void *opaque) > +{ > + AioContext *ctx = opaque; > + CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list; > + > + /* > + * If cqe_handler->cb() calls aio_poll() it must continue processing > + * ready_list. Schedule a BH so the inner event loop calls us again. > + */ > + qemu_bh_schedule(ctx->cqe_handler_bh); > + > + while (!QSIMPLEQ_EMPTY(ready_list)) { > + CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list); > + > + QSIMPLEQ_REMOVE_HEAD(ready_list, next); > + > + cqe_handler->cb(cqe_handler); > + } > + > + qemu_bh_cancel(ctx->cqe_handler_bh); > +} > + > +/* Returns true if a handler became ready */ > +static bool process_cqe(AioContext *ctx, > + AioHandlerList *ready_list, > + struct io_uring_cqe *cqe) > +{ > + CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe); > + > + /* poll_timeout and poll_remove have a zero user_data field */ > + if (!cqe_handler) { > + return false; > + } > + > + /* > + * Special handling for AioHandler cqes. They need ready_list and have a > + * return value. > + */ > + if (cqe_handler->cb == fdmon_special_cqe_handler) { > + AioHandler *node = container_of(cqe_handler, AioHandler, > cqe_handler); > + return process_cqe_aio_handler(ctx, ready_list, node, cqe); > + } > + > + cqe_handler->cqe = *cqe; > + QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next); > + qemu_bh_schedule(ctx->cqe_handler_bh); > + return false; > +} > + > static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list) > { > struct io_uring *ring = &ctx->fdmon_io_uring; > @@ -360,6 +422,7 @@ static const FDMonOps fdmon_io_uring_ops = { > .gsource_prepare = fdmon_io_uring_gsource_prepare, > .gsource_check = fdmon_io_uring_gsource_check, > .gsource_dispatch = fdmon_io_uring_gsource_dispatch, > + .add_sqe = fdmon_io_uring_add_sqe, > }; > > void fdmon_io_uring_setup(AioContext *ctx, Error **errp) > @@ -375,6 +438,8 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp) > } > > QSLIST_INIT(&ctx->submit_list); > + QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list); > + ctx->cqe_handler_bh = aio_bh_new(ctx, cqe_handler_bh, ctx); > ctx->fdmon_ops = &fdmon_io_uring_ops; > ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source, > ctx->fdmon_io_uring.ring_fd, G_IO_IN); > @@ -382,30 +447,36 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp) > > void fdmon_io_uring_destroy(AioContext *ctx) > { > - if (ctx->fdmon_ops == &fdmon_io_uring_ops) { > - AioHandler *node; > + AioHandler *node; > > - io_uring_queue_exit(&ctx->fdmon_io_uring); > + if (ctx->fdmon_ops != &fdmon_io_uring_ops) { > + return; > + } > > - /* Move handlers due to be removed onto the deleted list */ > - while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { > - unsigned flags = qatomic_fetch_and(&node->flags, > - ~(FDMON_IO_URING_PENDING | > - FDMON_IO_URING_ADD | > - FDMON_IO_URING_REMOVE)); > + io_uring_queue_exit(&ctx->fdmon_io_uring); > > - if (flags & FDMON_IO_URING_REMOVE) { > - QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, > node_deleted); > - } > + /* Move handlers due to be removed onto the deleted list */ > + while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) { > + unsigned flags = qatomic_fetch_and(&node->flags, > + ~(FDMON_IO_URING_PENDING | > + FDMON_IO_URING_ADD | > + FDMON_IO_URING_REMOVE)); > > - QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); > + if (flags & FDMON_IO_URING_REMOVE) { > + QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, > + node, node_deleted); > } > > - g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag); > - ctx->io_uring_fd_tag = NULL; > - > - qemu_lockcnt_lock(&ctx->list_lock); > - fdmon_poll_downgrade(ctx); > - qemu_lockcnt_unlock(&ctx->list_lock); > + QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted); > } > + > + g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag); > + ctx->io_uring_fd_tag = NULL; > + > + assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list)); > + qemu_bh_delete(ctx->cqe_handler_bh); > + > + qemu_lockcnt_lock(&ctx->list_lock); > + fdmon_poll_downgrade(ctx); > + qemu_lockcnt_unlock(&ctx->list_lock); > } > -- > 2.49.0 > > -- Eric Blake, Principal Software Engineer Red Hat, Inc. Virtualization: qemu.org | libguestfs.org