On Tue, Mar 04, 2025 at 10:50:29PM +0100, Maciej S. Szmigiero wrote: > On 26.02.2025 17:43, Peter Xu wrote: > > On Wed, Feb 19, 2025 at 09:33:59PM +0100, Maciej S. Szmigiero wrote: > > > From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com> > > > > > > This SaveVMHandler helps device provide its own asynchronous transmission > > > of the remaining data at the end of a precopy phase via multifd channels, > > > in parallel with the transfer done by save_live_complete_precopy handlers. > > > > > > These threads are launched only when multifd device state transfer is > > > supported. > > > > > > Management of these threads in done in the multifd migration code, > > > wrapping them in the generic thread pool. > > > > > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com> > > > --- > > > include/migration/misc.h | 17 +++++++ > > > include/migration/register.h | 19 +++++++ > > > include/qemu/typedefs.h | 3 ++ > > > migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++ > > > migration/savevm.c | 35 ++++++++++++- > > > 5 files changed, 158 insertions(+), 1 deletion(-) > > > > > > diff --git a/include/migration/misc.h b/include/migration/misc.h > > > index 273ebfca6256..8fd36eba1da7 100644 > > > --- a/include/migration/misc.h > > > +++ b/include/migration/misc.h > > > @@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, > > > MigrationChannel **channel, > > > Error **errp); > > > /* migration/multifd-device-state.c */ > > > +typedef struct SaveLiveCompletePrecopyThreadData { > > > + SaveLiveCompletePrecopyThreadHandler hdlr; > > > + char *idstr; > > > + uint32_t instance_id; > > > + void *handler_opaque; > > > +} SaveLiveCompletePrecopyThreadData; > > > + > > > bool multifd_queue_device_state(char *idstr, uint32_t instance_id, > > > char *data, size_t len); > > > bool multifd_device_state_supported(void); > > > +void > > > +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler > > > hdlr, > > > + char *idstr, uint32_t instance_id, > > > + void *opaque); > > > + > > > +bool multifd_device_state_save_thread_should_exit(void); > > > + > > > +void multifd_abort_device_state_save_threads(void); > > > +bool multifd_join_device_state_save_threads(void); > > > + > > > #endif > > > diff --git a/include/migration/register.h b/include/migration/register.h > > > index 58891aa54b76..c041ce32f2fc 100644 > > > --- a/include/migration/register.h > > > +++ b/include/migration/register.h > > > @@ -105,6 +105,25 @@ typedef struct SaveVMHandlers { > > > */ > > > int (*save_live_complete_precopy)(QEMUFile *f, void *opaque); > > > + /** > > > + * @save_live_complete_precopy_thread (invoked in a separate thread) > > > + * > > > + * Called at the end of a precopy phase from a separate worker thread > > > + * in configurations where multifd device state transfer is supported > > > + * in order to perform asynchronous transmission of the remaining > > > data in > > > + * parallel with @save_live_complete_precopy handlers. > > > + * When postcopy is enabled, devices that support postcopy will skip > > > this > > > + * step. > > > + * > > > + * @d: a #SaveLiveCompletePrecopyThreadData containing parameters > > > that the > > > + * handler may need, including this device section idstr and > > > instance_id, > > > + * and opaque data pointer passed to register_savevm_live(). > > > + * @errp: pointer to Error*, to store an error if it happens. > > > + * > > > + * Returns true to indicate success and false for errors. > > > + */ > > > + SaveLiveCompletePrecopyThreadHandler > > > save_live_complete_precopy_thread; > > > + > > > /* This runs both outside and inside the BQL. */ > > > /** > > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h > > > index fd23ff7771b1..42ed4e6be150 100644 > > > --- a/include/qemu/typedefs.h > > > +++ b/include/qemu/typedefs.h > > > @@ -108,6 +108,7 @@ typedef struct QString QString; > > > typedef struct RAMBlock RAMBlock; > > > typedef struct Range Range; > > > typedef struct ReservedRegion ReservedRegion; > > > +typedef struct SaveLiveCompletePrecopyThreadData > > > SaveLiveCompletePrecopyThreadData; > > > typedef struct SHPCDevice SHPCDevice; > > > typedef struct SSIBus SSIBus; > > > typedef struct TCGCPUOps TCGCPUOps; > > > @@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq; > > > typedef void (*qemu_irq_handler)(void *opaque, int n, int level); > > > typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit, > > > Error **errp); > > > +typedef bool > > > (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData > > > *d, > > > + Error **errp); > > > #endif /* QEMU_TYPEDEFS_H */ > > > diff --git a/migration/multifd-device-state.c > > > b/migration/multifd-device-state.c > > > index 5de3cf27d6e8..63f021fb8dad 100644 > > > --- a/migration/multifd-device-state.c > > > +++ b/migration/multifd-device-state.c > > > @@ -8,7 +8,10 @@ > > > */ > > > #include "qemu/osdep.h" > > > +#include "qapi/error.h" > > > #include "qemu/lockable.h" > > > +#include "block/thread-pool.h" > > > +#include "migration.h" > > > #include "migration/misc.h" > > > #include "multifd.h" > > > #include "options.h" > > > @@ -17,6 +20,9 @@ static struct { > > > QemuMutex queue_job_mutex; > > > MultiFDSendData *send_data; > > > + > > > + ThreadPool *threads; > > > + bool threads_abort; > > > } *multifd_send_device_state; > > > void multifd_device_state_send_setup(void) > > > @@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void) > > > qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); > > > multifd_send_device_state->send_data = multifd_send_data_alloc(); > > > + > > > + multifd_send_device_state->threads = thread_pool_new(); > > > + multifd_send_device_state->threads_abort = false; > > > } > > > void multifd_device_state_send_cleanup(void) > > > { > > > + g_clear_pointer(&multifd_send_device_state->threads, > > > thread_pool_free); > > > g_clear_pointer(&multifd_send_device_state->send_data, > > > multifd_send_data_free); > > > @@ -115,3 +125,78 @@ bool multifd_device_state_supported(void) > > > return migrate_multifd() && !migrate_mapped_ram() && > > > migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; > > > } > > > + > > > +static void multifd_device_state_save_thread_data_free(void *opaque) > > > +{ > > > + SaveLiveCompletePrecopyThreadData *data = opaque; > > > + > > > + g_clear_pointer(&data->idstr, g_free); > > > + g_free(data); > > > +} > > > + > > > +static int multifd_device_state_save_thread(void *opaque) > > > +{ > > > + SaveLiveCompletePrecopyThreadData *data = opaque; > > > + g_autoptr(Error) local_err = NULL; > > > + > > > + if (!data->hdlr(data, &local_err)) { > > > + MigrationState *s = migrate_get_current(); > > > + > > > + assert(local_err); > > > + > > > + /* > > > + * In case of multiple save threads failing which thread error > > > + * return we end setting is purely arbitrary. > > > + */ > > > + migrate_set_error(s, local_err); > > > > Where did you kick off all the threads when one hit error? I wonder if > > migrate_set_error() should just set quit flag for everything, but for this > > series it might be easier to use multifd_abort_device_state_save_threads(). > > I've now added call to multifd_abort_device_state_save_threads() if a > migration > error is already set to avoid needlessly waiting for the remaining threads to > do all of their work.
With that, feel free to take: Reviewed-by: Peter Xu <pet...@redhat.com> -- Peter Xu