On Tue, Mar 04, 2025 at 10:50:29PM +0100, Maciej S. Szmigiero wrote:
> On 26.02.2025 17:43, Peter Xu wrote:
> > On Wed, Feb 19, 2025 at 09:33:59PM +0100, Maciej S. Szmigiero wrote:
> > > From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com>
> > > 
> > > This SaveVMHandler helps device provide its own asynchronous transmission
> > > of the remaining data at the end of a precopy phase via multifd channels,
> > > in parallel with the transfer done by save_live_complete_precopy handlers.
> > > 
> > > These threads are launched only when multifd device state transfer is
> > > supported.
> > > 
> > > Management of these threads in done in the multifd migration code,
> > > wrapping them in the generic thread pool.
> > > 
> > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com>
> > > ---
> > >   include/migration/misc.h         | 17 +++++++
> > >   include/migration/register.h     | 19 +++++++
> > >   include/qemu/typedefs.h          |  3 ++
> > >   migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++
> > >   migration/savevm.c               | 35 ++++++++++++-
> > >   5 files changed, 158 insertions(+), 1 deletion(-)
> > > 
> > > diff --git a/include/migration/misc.h b/include/migration/misc.h
> > > index 273ebfca6256..8fd36eba1da7 100644
> > > --- a/include/migration/misc.h
> > > +++ b/include/migration/misc.h
> > > @@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, 
> > > MigrationChannel **channel,
> > >                          Error **errp);
> > >   /* migration/multifd-device-state.c */
> > > +typedef struct SaveLiveCompletePrecopyThreadData {
> > > +    SaveLiveCompletePrecopyThreadHandler hdlr;
> > > +    char *idstr;
> > > +    uint32_t instance_id;
> > > +    void *handler_opaque;
> > > +} SaveLiveCompletePrecopyThreadData;
> > > +
> > >   bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
> > >                                   char *data, size_t len);
> > >   bool multifd_device_state_supported(void);
> > > +void
> > > +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler
> > >  hdlr,
> > > +                                       char *idstr, uint32_t instance_id,
> > > +                                       void *opaque);
> > > +
> > > +bool multifd_device_state_save_thread_should_exit(void);
> > > +
> > > +void multifd_abort_device_state_save_threads(void);
> > > +bool multifd_join_device_state_save_threads(void);
> > > +
> > >   #endif
> > > diff --git a/include/migration/register.h b/include/migration/register.h
> > > index 58891aa54b76..c041ce32f2fc 100644
> > > --- a/include/migration/register.h
> > > +++ b/include/migration/register.h
> > > @@ -105,6 +105,25 @@ typedef struct SaveVMHandlers {
> > >        */
> > >       int (*save_live_complete_precopy)(QEMUFile *f, void *opaque);
> > > +    /**
> > > +     * @save_live_complete_precopy_thread (invoked in a separate thread)
> > > +     *
> > > +     * Called at the end of a precopy phase from a separate worker thread
> > > +     * in configurations where multifd device state transfer is supported
> > > +     * in order to perform asynchronous transmission of the remaining 
> > > data in
> > > +     * parallel with @save_live_complete_precopy handlers.
> > > +     * When postcopy is enabled, devices that support postcopy will skip 
> > > this
> > > +     * step.
> > > +     *
> > > +     * @d: a #SaveLiveCompletePrecopyThreadData containing parameters 
> > > that the
> > > +     * handler may need, including this device section idstr and 
> > > instance_id,
> > > +     * and opaque data pointer passed to register_savevm_live().
> > > +     * @errp: pointer to Error*, to store an error if it happens.
> > > +     *
> > > +     * Returns true to indicate success and false for errors.
> > > +     */
> > > +    SaveLiveCompletePrecopyThreadHandler 
> > > save_live_complete_precopy_thread;
> > > +
> > >       /* This runs both outside and inside the BQL.  */
> > >       /**
> > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> > > index fd23ff7771b1..42ed4e6be150 100644
> > > --- a/include/qemu/typedefs.h
> > > +++ b/include/qemu/typedefs.h
> > > @@ -108,6 +108,7 @@ typedef struct QString QString;
> > >   typedef struct RAMBlock RAMBlock;
> > >   typedef struct Range Range;
> > >   typedef struct ReservedRegion ReservedRegion;
> > > +typedef struct SaveLiveCompletePrecopyThreadData 
> > > SaveLiveCompletePrecopyThreadData;
> > >   typedef struct SHPCDevice SHPCDevice;
> > >   typedef struct SSIBus SSIBus;
> > >   typedef struct TCGCPUOps TCGCPUOps;
> > > @@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq;
> > >   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
> > >   typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
> > >                                       Error **errp);
> > > +typedef bool 
> > > (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData 
> > > *d,
> > > +                                                     Error **errp);
> > >   #endif /* QEMU_TYPEDEFS_H */
> > > diff --git a/migration/multifd-device-state.c 
> > > b/migration/multifd-device-state.c
> > > index 5de3cf27d6e8..63f021fb8dad 100644
> > > --- a/migration/multifd-device-state.c
> > > +++ b/migration/multifd-device-state.c
> > > @@ -8,7 +8,10 @@
> > >    */
> > >   #include "qemu/osdep.h"
> > > +#include "qapi/error.h"
> > >   #include "qemu/lockable.h"
> > > +#include "block/thread-pool.h"
> > > +#include "migration.h"
> > >   #include "migration/misc.h"
> > >   #include "multifd.h"
> > >   #include "options.h"
> > > @@ -17,6 +20,9 @@ static struct {
> > >       QemuMutex queue_job_mutex;
> > >       MultiFDSendData *send_data;
> > > +
> > > +    ThreadPool *threads;
> > > +    bool threads_abort;
> > >   } *multifd_send_device_state;
> > >   void multifd_device_state_send_setup(void)
> > > @@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void)
> > >       qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
> > >       multifd_send_device_state->send_data = multifd_send_data_alloc();
> > > +
> > > +    multifd_send_device_state->threads = thread_pool_new();
> > > +    multifd_send_device_state->threads_abort = false;
> > >   }
> > >   void multifd_device_state_send_cleanup(void)
> > >   {
> > > +    g_clear_pointer(&multifd_send_device_state->threads, 
> > > thread_pool_free);
> > >       g_clear_pointer(&multifd_send_device_state->send_data,
> > >                       multifd_send_data_free);
> > > @@ -115,3 +125,78 @@ bool multifd_device_state_supported(void)
> > >       return migrate_multifd() && !migrate_mapped_ram() &&
> > >           migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
> > >   }
> > > +
> > > +static void multifd_device_state_save_thread_data_free(void *opaque)
> > > +{
> > > +    SaveLiveCompletePrecopyThreadData *data = opaque;
> > > +
> > > +    g_clear_pointer(&data->idstr, g_free);
> > > +    g_free(data);
> > > +}
> > > +
> > > +static int multifd_device_state_save_thread(void *opaque)
> > > +{
> > > +    SaveLiveCompletePrecopyThreadData *data = opaque;
> > > +    g_autoptr(Error) local_err = NULL;
> > > +
> > > +    if (!data->hdlr(data, &local_err)) {
> > > +        MigrationState *s = migrate_get_current();
> > > +
> > > +        assert(local_err);
> > > +
> > > +        /*
> > > +         * In case of multiple save threads failing which thread error
> > > +         * return we end setting is purely arbitrary.
> > > +         */
> > > +        migrate_set_error(s, local_err);
> > 
> > Where did you kick off all the threads when one hit error?  I wonder if
> > migrate_set_error() should just set quit flag for everything, but for this
> > series it might be easier to use multifd_abort_device_state_save_threads().
> 
> I've now added call to multifd_abort_device_state_save_threads() if a 
> migration
> error is already set to avoid needlessly waiting for the remaining threads to
> do all of their work.

With that, feel free to take:

Reviewed-by: Peter Xu <pet...@redhat.com>

-- 
Peter Xu


Reply via email to