aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Introduce the aio_add_sqe() API for submitting io_uring requests in the current AioContext. This allows other components in QEMU, like the block layer, to take advantage of io_uring features without creating their own io_uring context. This API supports nested event loops just like file descriptor monitoring and BHs do. This comes at a complexity cost: CQE callbacks must be placed on a list so that nested event loops can invoke pending CQE callbacks from parent event loops. If you're wondering why CqeHandler exists instead of just a callback function pointer, this is why. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> Message-ID: <20251104022933.618123-14-stefanha@redhat.com> Reviewed-by: Kevin Wolf <kwolf@redhat.com> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
This commit is contained in:
parent
87e7a0f423
commit
1eebdab3c3
5 changed files with 190 additions and 16 deletions
|
|
@ -61,6 +61,27 @@ typedef struct LuringState LuringState;
|
||||||
/* Is polling disabled? */
|
/* Is polling disabled? */
|
||||||
bool aio_poll_disabled(AioContext *ctx);
|
bool aio_poll_disabled(AioContext *ctx);
|
||||||
|
|
||||||
|
#ifdef CONFIG_LINUX_IO_URING
|
||||||
|
/*
|
||||||
|
* Each io_uring request must have a unique CqeHandler that processes the cqe.
|
||||||
|
* The lifetime of a CqeHandler must be at least from aio_add_sqe() until
|
||||||
|
* ->cb() invocation.
|
||||||
|
*/
|
||||||
|
typedef struct CqeHandler CqeHandler;
|
||||||
|
struct CqeHandler {
|
||||||
|
/* Called by the AioContext when the request has completed */
|
||||||
|
void (*cb)(CqeHandler *handler);
|
||||||
|
|
||||||
|
/* Used internally, do not access this */
|
||||||
|
QSIMPLEQ_ENTRY(CqeHandler) next;
|
||||||
|
|
||||||
|
/* This field is filled in before ->cb() is called */
|
||||||
|
struct io_uring_cqe cqe;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ;
|
||||||
|
#endif /* CONFIG_LINUX_IO_URING */
|
||||||
|
|
||||||
/* Callbacks for file descriptor monitoring implementations */
|
/* Callbacks for file descriptor monitoring implementations */
|
||||||
typedef struct {
|
typedef struct {
|
||||||
/*
|
/*
|
||||||
|
|
@ -157,6 +178,27 @@ typedef struct {
|
||||||
* Called with list_lock incremented.
|
* Called with list_lock incremented.
|
||||||
*/
|
*/
|
||||||
void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
|
void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
|
||||||
|
|
||||||
|
#ifdef CONFIG_LINUX_IO_URING
|
||||||
|
/**
|
||||||
|
* add_sqe: Add an io_uring sqe for submission.
|
||||||
|
* @prep_sqe: invoked with an sqe that should be prepared for submission
|
||||||
|
* @opaque: user-defined argument to @prep_sqe()
|
||||||
|
* @cqe_handler: the unique cqe handler associated with this request
|
||||||
|
*
|
||||||
|
* The caller's @prep_sqe() function is invoked to fill in the details of
|
||||||
|
* the sqe. Do not call io_uring_sqe_set_data() on this sqe.
|
||||||
|
*
|
||||||
|
* The kernel may see the sqe as soon as @prep_sqe() returns or it may take
|
||||||
|
* until the next event loop iteration.
|
||||||
|
*
|
||||||
|
* This function is called from the current AioContext and is not
|
||||||
|
* thread-safe.
|
||||||
|
*/
|
||||||
|
void (*add_sqe)(AioContext *ctx,
|
||||||
|
void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
|
||||||
|
void *opaque, CqeHandler *cqe_handler);
|
||||||
|
#endif /* CONFIG_LINUX_IO_URING */
|
||||||
} FDMonOps;
|
} FDMonOps;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
@ -274,7 +316,10 @@ struct AioContext {
|
||||||
struct io_uring fdmon_io_uring;
|
struct io_uring fdmon_io_uring;
|
||||||
AioHandlerSList submit_list;
|
AioHandlerSList submit_list;
|
||||||
void *io_uring_fd_tag;
|
void *io_uring_fd_tag;
|
||||||
#endif
|
|
||||||
|
/* Pending callback state for cqe handlers */
|
||||||
|
CqeHandlerSimpleQ cqe_handler_ready_list;
|
||||||
|
#endif /* CONFIG_LINUX_IO_URING */
|
||||||
|
|
||||||
/* TimerLists for calling timers - one per clock type. Has its own
|
/* TimerLists for calling timers - one per clock type. Has its own
|
||||||
* locking.
|
* locking.
|
||||||
|
|
@ -782,4 +827,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch);
|
||||||
*/
|
*/
|
||||||
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
|
void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
|
||||||
int64_t max, Error **errp);
|
int64_t max, Error **errp);
|
||||||
|
|
||||||
|
#ifdef CONFIG_LINUX_IO_URING
|
||||||
|
/**
|
||||||
|
* aio_has_io_uring: Return whether io_uring is available.
|
||||||
|
*
|
||||||
|
* io_uring is either available in all AioContexts or in none, so this only
|
||||||
|
* needs to be called once from within any thread's AioContext.
|
||||||
|
*/
|
||||||
|
static inline bool aio_has_io_uring(void)
|
||||||
|
{
|
||||||
|
AioContext *ctx = qemu_get_current_aio_context();
|
||||||
|
return ctx->fdmon_ops->add_sqe;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* aio_add_sqe: Add an io_uring sqe for submission.
|
||||||
|
* @prep_sqe: invoked with an sqe that should be prepared for submission
|
||||||
|
* @opaque: user-defined argument to @prep_sqe()
|
||||||
|
* @cqe_handler: the unique cqe handler associated with this request
|
||||||
|
*
|
||||||
|
* The caller's @prep_sqe() function is invoked to fill in the details of the
|
||||||
|
* sqe. Do not call io_uring_sqe_set_data() on this sqe.
|
||||||
|
*
|
||||||
|
* The sqe is submitted by the current AioContext. The kernel may see the sqe
|
||||||
|
* as soon as @prep_sqe() returns or it may take until the next event loop
|
||||||
|
* iteration.
|
||||||
|
*
|
||||||
|
* When the AioContext is destroyed, pending sqes are ignored and their
|
||||||
|
* CqeHandlers are not invoked.
|
||||||
|
*
|
||||||
|
* This function must be called only when aio_has_io_uring() returns true.
|
||||||
|
*/
|
||||||
|
void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
|
||||||
|
void *opaque, CqeHandler *cqe_handler);
|
||||||
|
#endif /* CONFIG_LINUX_IO_URING */
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -806,3 +806,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
|
||||||
|
|
||||||
aio_notify(ctx);
|
aio_notify(ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef CONFIG_LINUX_IO_URING
|
||||||
|
void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
|
||||||
|
void *opaque, CqeHandler *cqe_handler)
|
||||||
|
{
|
||||||
|
AioContext *ctx = qemu_get_current_aio_context();
|
||||||
|
ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler);
|
||||||
|
}
|
||||||
|
#endif /* CONFIG_LINUX_IO_URING */
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,7 @@ struct AioHandler {
|
||||||
#ifdef CONFIG_LINUX_IO_URING
|
#ifdef CONFIG_LINUX_IO_URING
|
||||||
QSLIST_ENTRY(AioHandler) node_submitted;
|
QSLIST_ENTRY(AioHandler) node_submitted;
|
||||||
unsigned flags; /* see fdmon-io_uring.c */
|
unsigned flags; /* see fdmon-io_uring.c */
|
||||||
|
CqeHandler internal_cqe_handler; /* used for POLL_ADD/POLL_REMOVE */
|
||||||
#endif
|
#endif
|
||||||
int64_t poll_idle_timeout; /* when to stop userspace polling */
|
int64_t poll_idle_timeout; /* when to stop userspace polling */
|
||||||
bool poll_ready; /* has polling detected an event? */
|
bool poll_ready; /* has polling detected an event? */
|
||||||
|
|
|
||||||
|
|
@ -46,8 +46,10 @@
|
||||||
#include "qemu/osdep.h"
|
#include "qemu/osdep.h"
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
#include "qapi/error.h"
|
#include "qapi/error.h"
|
||||||
|
#include "qemu/defer-call.h"
|
||||||
#include "qemu/rcu_queue.h"
|
#include "qemu/rcu_queue.h"
|
||||||
#include "aio-posix.h"
|
#include "aio-posix.h"
|
||||||
|
#include "trace.h"
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */
|
FDMON_IO_URING_ENTRIES = 128, /* sq/cq ring size */
|
||||||
|
|
@ -76,8 +78,8 @@ static inline int pfd_events_from_poll(int poll_events)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Returns an sqe for submitting a request. Only be called within
|
* Returns an sqe for submitting a request. Only called from the AioContext
|
||||||
* fdmon_io_uring_wait().
|
* thread.
|
||||||
*/
|
*/
|
||||||
static struct io_uring_sqe *get_sqe(AioContext *ctx)
|
static struct io_uring_sqe *get_sqe(AioContext *ctx)
|
||||||
{
|
{
|
||||||
|
|
@ -168,23 +170,46 @@ static void fdmon_io_uring_update(AioContext *ctx,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void fdmon_io_uring_add_sqe(AioContext *ctx,
|
||||||
|
void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
|
||||||
|
void *opaque, CqeHandler *cqe_handler)
|
||||||
|
{
|
||||||
|
struct io_uring_sqe *sqe = get_sqe(ctx);
|
||||||
|
|
||||||
|
prep_sqe(sqe, opaque);
|
||||||
|
io_uring_sqe_set_data(sqe, cqe_handler);
|
||||||
|
|
||||||
|
trace_fdmon_io_uring_add_sqe(ctx, opaque, sqe->opcode, sqe->fd, sqe->off,
|
||||||
|
cqe_handler);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This is an empty function that is never called. It is used as a function
|
||||||
|
* pointer to distinguish it from ordinary cqe handlers.
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
|
static void add_poll_add_sqe(AioContext *ctx, AioHandler *node)
|
||||||
{
|
{
|
||||||
struct io_uring_sqe *sqe = get_sqe(ctx);
|
struct io_uring_sqe *sqe = get_sqe(ctx);
|
||||||
int events = poll_events_from_pfd(node->pfd.events);
|
int events = poll_events_from_pfd(node->pfd.events);
|
||||||
|
|
||||||
io_uring_prep_poll_add(sqe, node->pfd.fd, events);
|
io_uring_prep_poll_add(sqe, node->pfd.fd, events);
|
||||||
io_uring_sqe_set_data(sqe, node);
|
node->internal_cqe_handler.cb = fdmon_special_cqe_handler;
|
||||||
|
io_uring_sqe_set_data(sqe, &node->internal_cqe_handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
|
static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
|
||||||
{
|
{
|
||||||
struct io_uring_sqe *sqe = get_sqe(ctx);
|
struct io_uring_sqe *sqe = get_sqe(ctx);
|
||||||
|
CqeHandler *cqe_handler = &node->internal_cqe_handler;
|
||||||
|
|
||||||
#ifdef LIBURING_HAVE_DATA64
|
#ifdef LIBURING_HAVE_DATA64
|
||||||
io_uring_prep_poll_remove(sqe, (uintptr_t)node);
|
io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
|
||||||
#else
|
#else
|
||||||
io_uring_prep_poll_remove(sqe, node);
|
io_uring_prep_poll_remove(sqe, cqe_handler);
|
||||||
#endif
|
#endif
|
||||||
io_uring_sqe_set_data(sqe, NULL);
|
io_uring_sqe_set_data(sqe, NULL);
|
||||||
}
|
}
|
||||||
|
|
@ -219,19 +244,13 @@ static void fill_sq_ring(AioContext *ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Returns true if a handler became ready */
|
static bool process_cqe_aio_handler(AioContext *ctx,
|
||||||
static bool process_cqe(AioContext *ctx,
|
AioHandlerList *ready_list,
|
||||||
AioHandlerList *ready_list,
|
AioHandler *node,
|
||||||
struct io_uring_cqe *cqe)
|
struct io_uring_cqe *cqe)
|
||||||
{
|
{
|
||||||
AioHandler *node = io_uring_cqe_get_data(cqe);
|
|
||||||
unsigned flags;
|
unsigned flags;
|
||||||
|
|
||||||
/* poll_timeout and poll_remove have a zero user_data field */
|
|
||||||
if (!node) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Deletion can only happen when IORING_OP_POLL_ADD completes. If we race
|
* Deletion can only happen when IORING_OP_POLL_ADD completes. If we race
|
||||||
* with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
|
* with enqueue() here then we can safely clear the FDMON_IO_URING_REMOVE
|
||||||
|
|
@ -255,6 +274,35 @@ static bool process_cqe(AioContext *ctx,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Returns true if a handler became ready */
|
||||||
|
static bool process_cqe(AioContext *ctx,
|
||||||
|
AioHandlerList *ready_list,
|
||||||
|
struct io_uring_cqe *cqe)
|
||||||
|
{
|
||||||
|
CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
|
||||||
|
|
||||||
|
/* poll_timeout and poll_remove have a zero user_data field */
|
||||||
|
if (!cqe_handler) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Special handling for AioHandler cqes. They need ready_list and have a
|
||||||
|
* return value.
|
||||||
|
*/
|
||||||
|
if (cqe_handler->cb == fdmon_special_cqe_handler) {
|
||||||
|
AioHandler *node = container_of(cqe_handler, AioHandler,
|
||||||
|
internal_cqe_handler);
|
||||||
|
return process_cqe_aio_handler(ctx, ready_list, node, cqe);
|
||||||
|
}
|
||||||
|
|
||||||
|
cqe_handler->cqe = *cqe;
|
||||||
|
|
||||||
|
/* Handlers are invoked later by fdmon_io_uring_dispatch() */
|
||||||
|
QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
|
static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
|
||||||
{
|
{
|
||||||
struct io_uring *ring = &ctx->fdmon_io_uring;
|
struct io_uring *ring = &ctx->fdmon_io_uring;
|
||||||
|
|
@ -299,6 +347,32 @@ static bool fdmon_io_uring_gsource_check(AioContext *ctx)
|
||||||
return g_source_query_unix_fd(&ctx->source, tag) & G_IO_IN;
|
return g_source_query_unix_fd(&ctx->source, tag) & G_IO_IN;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Dispatch CQE handlers that are ready */
|
||||||
|
static bool fdmon_io_uring_dispatch(AioContext *ctx)
|
||||||
|
{
|
||||||
|
CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
|
||||||
|
bool progress = false;
|
||||||
|
|
||||||
|
/* Handlers may use defer_call() to coalesce frequent operations */
|
||||||
|
defer_call_begin();
|
||||||
|
|
||||||
|
while (!QSIMPLEQ_EMPTY(ready_list)) {
|
||||||
|
CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
|
||||||
|
|
||||||
|
QSIMPLEQ_REMOVE_HEAD(ready_list, next);
|
||||||
|
|
||||||
|
trace_fdmon_io_uring_cqe_handler(ctx, cqe_handler,
|
||||||
|
cqe_handler->cqe.res);
|
||||||
|
cqe_handler->cb(cqe_handler);
|
||||||
|
progress = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
defer_call_end();
|
||||||
|
|
||||||
|
return progress;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* This is where CQEs are processed in the glib event loop */
|
/* This is where CQEs are processed in the glib event loop */
|
||||||
static void fdmon_io_uring_gsource_dispatch(AioContext *ctx,
|
static void fdmon_io_uring_gsource_dispatch(AioContext *ctx,
|
||||||
AioHandlerList *ready_list)
|
AioHandlerList *ready_list)
|
||||||
|
|
@ -371,9 +445,11 @@ static const FDMonOps fdmon_io_uring_ops = {
|
||||||
.update = fdmon_io_uring_update,
|
.update = fdmon_io_uring_update,
|
||||||
.wait = fdmon_io_uring_wait,
|
.wait = fdmon_io_uring_wait,
|
||||||
.need_wait = fdmon_io_uring_need_wait,
|
.need_wait = fdmon_io_uring_need_wait,
|
||||||
|
.dispatch = fdmon_io_uring_dispatch,
|
||||||
.gsource_prepare = fdmon_io_uring_gsource_prepare,
|
.gsource_prepare = fdmon_io_uring_gsource_prepare,
|
||||||
.gsource_check = fdmon_io_uring_gsource_check,
|
.gsource_check = fdmon_io_uring_gsource_check,
|
||||||
.gsource_dispatch = fdmon_io_uring_gsource_dispatch,
|
.gsource_dispatch = fdmon_io_uring_gsource_dispatch,
|
||||||
|
.add_sqe = fdmon_io_uring_add_sqe,
|
||||||
};
|
};
|
||||||
|
|
||||||
bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
|
bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
|
||||||
|
|
@ -389,6 +465,7 @@ bool fdmon_io_uring_setup(AioContext *ctx, Error **errp)
|
||||||
}
|
}
|
||||||
|
|
||||||
QSLIST_INIT(&ctx->submit_list);
|
QSLIST_INIT(&ctx->submit_list);
|
||||||
|
QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
|
||||||
ctx->fdmon_ops = &fdmon_io_uring_ops;
|
ctx->fdmon_ops = &fdmon_io_uring_ops;
|
||||||
ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
|
ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
|
||||||
ctx->fdmon_io_uring.ring_fd, G_IO_IN);
|
ctx->fdmon_io_uring.ring_fd, G_IO_IN);
|
||||||
|
|
@ -425,6 +502,8 @@ void fdmon_io_uring_destroy(AioContext *ctx)
|
||||||
g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
|
g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
|
||||||
ctx->io_uring_fd_tag = NULL;
|
ctx->io_uring_fd_tag = NULL;
|
||||||
|
|
||||||
|
assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
|
||||||
|
|
||||||
qemu_lockcnt_lock(&ctx->list_lock);
|
qemu_lockcnt_lock(&ctx->list_lock);
|
||||||
fdmon_poll_downgrade(ctx);
|
fdmon_poll_downgrade(ctx);
|
||||||
qemu_lockcnt_unlock(&ctx->list_lock);
|
qemu_lockcnt_unlock(&ctx->list_lock);
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,10 @@ buffer_move_empty(const char *buf, size_t len, const char *from) "%s: %zd bytes
|
||||||
buffer_move(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s"
|
buffer_move(const char *buf, size_t len, const char *from) "%s: %zd bytes from %s"
|
||||||
buffer_free(const char *buf, size_t len) "%s: capacity %zd"
|
buffer_free(const char *buf, size_t len) "%s: capacity %zd"
|
||||||
|
|
||||||
|
# fdmon-io_uring.c
|
||||||
|
fdmon_io_uring_add_sqe(void *ctx, void *opaque, int opcode, int fd, uint64_t off, void *cqe_handler) "ctx %p opaque %p opcode %d fd %d off %"PRId64" cqe_handler %p"
|
||||||
|
fdmon_io_uring_cqe_handler(void *ctx, void *cqe_handler, int cqe_res) "ctx %p cqe_handler %p cqe_res %d"
|
||||||
|
|
||||||
# filemonitor-inotify.c
|
# filemonitor-inotify.c
|
||||||
qemu_file_monitor_add_watch(void *mon, const char *dirpath, const char *filename, void *cb, void *opaque, int64_t id) "File monitor %p add watch dir='%s' file='%s' cb=%p opaque=%p id=%" PRId64
|
qemu_file_monitor_add_watch(void *mon, const char *dirpath, const char *filename, void *cb, void *opaque, int64_t id) "File monitor %p add watch dir='%s' file='%s' cb=%p opaque=%p id=%" PRId64
|
||||||
qemu_file_monitor_remove_watch(void *mon, const char *dirpath, int64_t id) "File monitor %p remove watch dir='%s' id=%" PRId64
|
qemu_file_monitor_remove_watch(void *mon, const char *dirpath, int64_t id) "File monitor %p remove watch dir='%s' id=%" PRId64
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue