From: "Maciej S. Szmigiero" <maciej.szmigi...@oracle.com> This SaveVMHandler helps device provide its own asynchronous transmission of the remaining data at the end of a precopy phase via multifd channels, in parallel with the transfer done by save_live_complete_precopy handlers.
These threads are launched only when multifd device state transfer is supported, after all save_live_complete_precopy_begin handlers have already finished (for stream synchronization purposes). Management of these threads in done in the multifd migration code, wrapping them in the generic thread pool. Signed-off-by: Maciej S. Szmigiero <maciej.szmigi...@oracle.com> --- include/migration/misc.h | 10 ++++ include/migration/register.h | 25 +++++++++ include/qemu/typedefs.h | 4 ++ migration/multifd-device-state.c | 87 ++++++++++++++++++++++++++++++++ migration/savevm.c | 40 ++++++++++++++- 5 files changed, 165 insertions(+), 1 deletion(-) diff --git a/include/migration/misc.h b/include/migration/misc.h index 189de6d02ad6..26f7f3140f03 100644 --- a/include/migration/misc.h +++ b/include/migration/misc.h @@ -116,4 +116,14 @@ bool multifd_queue_device_state(char *idstr, uint32_t instance_id, char *data, size_t len); bool migration_has_device_state_support(void); +void +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, + char *idstr, uint32_t instance_id, + void *opaque); + +void multifd_launch_device_state_save_threads(int max_count); + +void multifd_abort_device_state_save_threads(void); +int multifd_join_device_state_save_threads(void); + #endif diff --git a/include/migration/register.h b/include/migration/register.h index 44d8cf5192ae..ace2cfc0f75e 100644 --- a/include/migration/register.h +++ b/include/migration/register.h @@ -139,6 +139,31 @@ typedef struct SaveVMHandlers { */ int (*save_live_complete_precopy_end)(QEMUFile *f, void *opaque); + /* This runs in a separate thread. */ + + /** + * @save_live_complete_precopy_thread + * + * Called at the end of a precopy phase from a separate worker thread + * in configurations where multifd device state transfer is supported + * in order to perform asynchronous transmission of the remaining data in + * parallel with @save_live_complete_precopy handlers. + * The call happens after all @save_live_complete_precopy_begin handlers + * have finished. + * When postcopy is enabled, devices that support postcopy will skip this + * step. + * + * @idstr: this device section idstr + * @instance_id: this device section instance_id + * @abort_flag: flag indicating that the migration core wants to abort + * the transmission and so the handler should exit ASAP. To be read by + * qatomic_read() or similar. + * @opaque: data pointer passed to register_savevm_live() + * + * Returns zero to indicate success and negative for error + */ + SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread; + /* This runs both outside and inside the BQL. */ /** diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h index 9d222dc37628..edd6e7b9c116 100644 --- a/include/qemu/typedefs.h +++ b/include/qemu/typedefs.h @@ -130,5 +130,9 @@ typedef struct IRQState *qemu_irq; * Function types */ typedef void (*qemu_irq_handler)(void *opaque, int n, int level); +typedef int (*SaveLiveCompletePrecopyThreadHandler)(char *idstr, + uint32_t instance_id, + bool *abort_flag, + void *opaque); #endif /* QEMU_TYPEDEFS_H */ diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c index 7b34fe736c7f..9b364e8ef33c 100644 --- a/migration/multifd-device-state.c +++ b/migration/multifd-device-state.c @@ -9,12 +9,17 @@ #include "qemu/osdep.h" #include "qemu/lockable.h" +#include "block/thread-pool.h" #include "migration/misc.h" #include "multifd.h" #include "options.h" static QemuMutex queue_job_mutex; +ThreadPool *send_threads; +int send_threads_ret; +bool send_threads_abort; + static MultiFDSendData *device_state_send; size_t multifd_device_state_payload_size(void) @@ -27,6 +32,10 @@ void multifd_device_state_save_setup(void) qemu_mutex_init(&queue_job_mutex); device_state_send = multifd_send_data_alloc(); + + send_threads = thread_pool_new(NULL); + send_threads_ret = 0; + send_threads_abort = false; } void multifd_device_state_clear(MultiFDDeviceState_t *device_state) @@ -37,6 +46,7 @@ void multifd_device_state_clear(MultiFDDeviceState_t *device_state) void multifd_device_state_save_cleanup(void) { + g_clear_pointer(&send_threads, thread_pool_free); g_clear_pointer(&device_state_send, multifd_send_data_free); qemu_mutex_destroy(&queue_job_mutex); @@ -104,3 +114,80 @@ bool migration_has_device_state_support(void) return migrate_multifd() && !migrate_mapped_ram() && migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; } + +static void multifd_device_state_save_thread_complete(void *opaque, int ret) +{ + if (ret && !send_threads_ret) { + send_threads_ret = ret; + } +} + +struct MultiFDDSSaveThreadData { + SaveLiveCompletePrecopyThreadHandler hdlr; + char *idstr; + uint32_t instance_id; + void *opaque; +}; + +static void multifd_device_state_save_thread_data_free(void *opaque) +{ + struct MultiFDDSSaveThreadData *data = opaque; + + g_clear_pointer(&data->idstr, g_free); + g_free(data); +} + +static int multifd_device_state_save_thread(void *opaque) +{ + struct MultiFDDSSaveThreadData *data = opaque; + + return data->hdlr(data->idstr, data->instance_id, &send_threads_abort, + data->opaque); +} + +void +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, + char *idstr, uint32_t instance_id, + void *opaque) +{ + struct MultiFDDSSaveThreadData *data; + + assert(migration_has_device_state_support()); + + data = g_new(struct MultiFDDSSaveThreadData, 1); + data->hdlr = hdlr; + data->idstr = g_strdup(idstr); + data->instance_id = instance_id; + data->opaque = opaque; + + thread_pool_submit(send_threads, + multifd_device_state_save_thread, + data, multifd_device_state_save_thread_data_free, + multifd_device_state_save_thread_complete, NULL); +} + +void multifd_launch_device_state_save_threads(int max_count) +{ + assert(migration_has_device_state_support()); + + thread_pool_set_minmax_threads(send_threads, + 0, max_count); + + thread_pool_poll(send_threads); +} + +void multifd_abort_device_state_save_threads(void) +{ + assert(migration_has_device_state_support()); + + qatomic_set(&send_threads_abort, true); +} + +int multifd_join_device_state_save_threads(void) +{ + assert(migration_has_device_state_support()); + + thread_pool_join(send_threads); + + return send_threads_ret; +} diff --git a/migration/savevm.c b/migration/savevm.c index 33c9200d1e78..a70f6ed006f2 100644 --- a/migration/savevm.c +++ b/migration/savevm.c @@ -1495,6 +1495,7 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) int64_t start_ts_each, end_ts_each; SaveStateEntry *se; int ret; + bool multifd_device_state = migration_has_device_state_support(); QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { if (!se->ops || (in_postcopy && se->ops->has_postcopy && @@ -1517,6 +1518,27 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) } } + if (multifd_device_state) { + int thread_count = 0; + + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { + SaveLiveCompletePrecopyThreadHandler hdlr; + + if (!se->ops || (in_postcopy && se->ops->has_postcopy && + se->ops->has_postcopy(se->opaque)) || + !se->ops->save_live_complete_precopy_thread) { + continue; + } + + hdlr = se->ops->save_live_complete_precopy_thread; + multifd_spawn_device_state_save_thread(hdlr, + se->idstr, se->instance_id, + se->opaque); + thread_count++; + } + multifd_launch_device_state_save_threads(thread_count); + } + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { if (!se->ops || (in_postcopy && se->ops->has_postcopy && @@ -1541,13 +1563,21 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) save_section_footer(f, se); if (ret < 0) { qemu_file_set_error(f, ret); - return -1; + goto ret_fail_abort_threads; } end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME); trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id, end_ts_each - start_ts_each); } + if (multifd_device_state) { + ret = multifd_join_device_state_save_threads(); + if (ret) { + qemu_file_set_error(f, ret); + return -1; + } + } + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { if (!se->ops || (in_postcopy && se->ops->has_postcopy && se->ops->has_postcopy(se->opaque)) || @@ -1565,6 +1595,14 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) trace_vmstate_downtime_checkpoint("src-iterable-saved"); return 0; + +ret_fail_abort_threads: + if (multifd_device_state) { + multifd_abort_device_state_save_threads(); + multifd_join_device_state_save_threads(); + } + + return -1; } int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f,