On Mon, Nov 27, 2023 at 05:26:00PM -0300, Fabiano Rosas wrote: > Currently multifd does not need to have knowledge of pages on the > receiving side because all the information needed is within the > packets that come in the stream. > > We're about to add support to fixed-ram migration, which cannot use > packets because it expects the ramblock section in the migration file > to contain only the guest pages data. > > Add a data structure to transfer pages between the ram migration code > and the multifd receiving threads. > > We don't want to reuse MultiFDPages_t for two reasons: > > a) multifd threads don't really need to know about the data they're > receiving. > > b) the receiving side has to be stopped to load the pages, which means > we can experiment with larger granularities than page size when > transferring data. > > Signed-off-by: Fabiano Rosas <faro...@suse.de> > --- > - stopped using MultiFDPages_t and added a new structure which can > take offset + size > --- > migration/multifd.c | 122 ++++++++++++++++++++++++++++++++++++++++++-- > migration/multifd.h | 20 ++++++++ > 2 files changed, 138 insertions(+), 4 deletions(-) > > diff --git a/migration/multifd.c b/migration/multifd.c > index c1381bdc21..7dfab2367a 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -142,17 +142,36 @@ static void nocomp_recv_cleanup(MultiFDRecvParams *p) > static int nocomp_recv_data(MultiFDRecvParams *p, Error **errp) > { > uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; > + ERRP_GUARD(); > > if (flags != MULTIFD_FLAG_NOCOMP) { > error_setg(errp, "multifd %u: flags received %x flags expected %x", > p->id, flags, MULTIFD_FLAG_NOCOMP); > return -1; > } > - for (int i = 0; i < p->normal_num; i++) { > - p->iov[i].iov_base = p->host + p->normal[i]; > - p->iov[i].iov_len = p->page_size; > + > + if (!migrate_multifd_packets()) { > + MultiFDRecvData *data = p->data; > + size_t ret; > + > + ret = qio_channel_pread(p->c, (char *) data->opaque, > + data->size, data->file_offset, errp); > + if (ret != data->size) { > + error_prepend(errp, > + "multifd recv (%u): read 0x%zx, expected 0x%zx", > + p->id, ret, data->size); > + return -1; > + } > + > + return 0; > + } else { > + for (int i = 0; i < p->normal_num; i++) { > + p->iov[i].iov_base = p->host + p->normal[i]; > + p->iov[i].iov_len = p->page_size; > + } > + > + return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); > }
I guess you managed to squash the file loads into "no compression" handler of multifd, but IMHO it's not as clean. Firstly, if to do so, we'd better make sure multifd-compression is not enabled anywhere together with fixed-ram. I didn't yet see such protection in the series. I think if it happens we should expect crashes because they'll go into zlib/zstd paths for the file. IMHO the only model fixed-ram can share with multifd is the task management part, mutexes, semaphores, etc.. IIRC I used to mention that it'll be nice if we have simply a pool of threads so we can enqueue tasks. If that's too far away, would something like below closer to that? What I'm thinking: - patch 1: rename MultiFDMethods to MultiFDCompressMethods, this can replace the other patch to do s/recv_pages/recv_data/ - patch 2: introduce MultiFDMethods (on top of MultiFDCompressMethods), refactor the current code to provide the socket version of MultiFDMethods. - patch 3: add the fixed-ram "file" version of MultiFDMethods MultiFDCompressMethods doesn't need to be used at all for "file" version of MultiFDMethods. Would that work? > - return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); > } > > static MultiFDMethods multifd_nocomp_ops = { > @@ -989,6 +1008,7 @@ int multifd_save_setup(Error **errp) > > struct { > MultiFDRecvParams *params; > + MultiFDRecvData *data; (If above would work, maybe we can split MultiFDRecvParams into two chunks, one commonly used for both, one only for sockets?) > /* number of created threads */ > int count; > /* syncs main thread and channels */ > @@ -999,6 +1019,49 @@ struct { > MultiFDMethods *ops; > } *multifd_recv_state; > > +int multifd_recv(void) > +{ > + int i; > + static int next_recv_channel; > + MultiFDRecvParams *p = NULL; > + MultiFDRecvData *data = multifd_recv_state->data; > + > + /* > + * next_channel can remain from a previous migration that was > + * using more channels, so ensure it doesn't overflow if the > + * limit is lower now. > + */ > + next_recv_channel %= migrate_multifd_channels(); > + for (i = next_recv_channel;; i = (i + 1) % migrate_multifd_channels()) { > + p = &multifd_recv_state->params[i]; > + > + qemu_mutex_lock(&p->mutex); > + if (p->quit) { > + error_report("%s: channel %d has already quit!", __func__, i); > + qemu_mutex_unlock(&p->mutex); > + return -1; > + } > + if (!p->pending_job) { > + p->pending_job++; > + next_recv_channel = (i + 1) % migrate_multifd_channels(); > + break; > + } > + qemu_mutex_unlock(&p->mutex); > + } > + assert(p->data->size == 0); > + multifd_recv_state->data = p->data; > + p->data = data; > + qemu_mutex_unlock(&p->mutex); > + qemu_sem_post(&p->sem); > + > + return 1; > +} PS: so if we have the pool model we can already mostly merge above code with multifd_send_pages().. because this will be a common helper to enqueue a task to a pool, no matter it's for writting (to file/socket) or reading (only from file). > + > +MultiFDRecvData *multifd_get_recv_data(void) > +{ > + return multifd_recv_state->data; > +} > + > static void multifd_recv_terminate_threads(Error *err) > { > int i; > @@ -1020,6 +1083,7 @@ static void multifd_recv_terminate_threads(Error *err) > > qemu_mutex_lock(&p->mutex); > p->quit = true; > + qemu_sem_post(&p->sem); > /* > * We could arrive here for two reasons: > * - normal quit, i.e. everything went fine, just finished > @@ -1069,6 +1133,7 @@ void multifd_load_cleanup(void) > p->c = NULL; > qemu_mutex_destroy(&p->mutex); > qemu_sem_destroy(&p->sem_sync); > + qemu_sem_destroy(&p->sem); > g_free(p->name); > p->name = NULL; > p->packet_len = 0; > @@ -1083,6 +1148,8 @@ void multifd_load_cleanup(void) > qemu_sem_destroy(&multifd_recv_state->sem_sync); > g_free(multifd_recv_state->params); > multifd_recv_state->params = NULL; > + g_free(multifd_recv_state->data); > + multifd_recv_state->data = NULL; > g_free(multifd_recv_state); > multifd_recv_state = NULL; > } > @@ -1094,6 +1161,21 @@ void multifd_recv_sync_main(void) > if (!migrate_multifd() || !migrate_multifd_packets()) { [1] > return; > } > + > + if (!migrate_multifd_packets()) { Hmm, isn't this checked already above at [1]? Could this path ever trigger then? Maybe we need to drop the one at [1]? IIUC what you wanted to do here is relying on the last RAM_SAVE_FLAG_EOS in the image file to do a full flush to make sure all pages are loaded. You may want to be careful on the side effect of flush_after_each_section parameter: case RAM_SAVE_FLAG_EOS: /* normal exit */ if (migrate_multifd() && migrate_multifd_flush_after_each_section()) { multifd_recv_sync_main(); } You may want to flush always for file? > + for (i = 0; i < migrate_multifd_channels(); i++) { > + MultiFDRecvParams *p = &multifd_recv_state->params[i]; > + > + qemu_sem_post(&p->sem); > + qemu_sem_wait(&p->sem_sync); > + > + qemu_mutex_lock(&p->mutex); > + assert(!p->pending_job || p->quit); > + qemu_mutex_unlock(&p->mutex); > + } > + return; Btw, how does this kick off all the recv threads? Is it because you did a sem_post(&sem) with p->pending_job==false this time? Maybe it's clearer to just set p->quit (or a global quite knob) somewhere? That'll be clear that this is a one-shot thing, only needed at the end of the file incoming migration. > + } > + > for (i = 0; i < migrate_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > @@ -1156,6 +1238,18 @@ static void *multifd_recv_thread(void *opaque) > > p->total_normal_pages += p->normal_num; > has_data = !!p->normal_num; > + } else { > + /* > + * No packets, so we need to wait for the vmstate code to > + * give us work. > + */ > + qemu_sem_wait(&p->sem); > + qemu_mutex_lock(&p->mutex); > + if (!p->pending_job) { > + qemu_mutex_unlock(&p->mutex); > + break; > + } > + has_data = !!p->data->size; > } > > qemu_mutex_unlock(&p->mutex); > @@ -1171,6 +1265,17 @@ static void *multifd_recv_thread(void *opaque) > qemu_sem_post(&multifd_recv_state->sem_sync); > qemu_sem_wait(&p->sem_sync); > } > + > + if (!use_packets) { > + qemu_mutex_lock(&p->mutex); > + p->data->size = 0; > + p->pending_job--; > + qemu_mutex_unlock(&p->mutex); > + } > + } > + > + if (!use_packets) { > + qemu_sem_post(&p->sem_sync); Currently sem_sync is only posted with MULTIFD_FLAG_SYNC flag. We'd better be careful on reusing it. Maybe add some comment above recv_state->sem_sync? /* * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag. * * For files: this is only posted at the end of the file load to mark * completion of the load process. */ > } > > if (local_err) { > @@ -1205,6 +1310,10 @@ int multifd_load_setup(Error **errp) > thread_count = migrate_multifd_channels(); > multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); > multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); > + > + multifd_recv_state->data = g_new0(MultiFDRecvData, 1); > + multifd_recv_state->data->size = 0; > + > qatomic_set(&multifd_recv_state->count, 0); > qemu_sem_init(&multifd_recv_state->sem_sync, 0); > multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; > @@ -1214,9 +1323,14 @@ int multifd_load_setup(Error **errp) > > qemu_mutex_init(&p->mutex); > qemu_sem_init(&p->sem_sync, 0); > + qemu_sem_init(&p->sem, 0); > p->quit = false; > + p->pending_job = 0; > p->id = i; > > + p->data = g_new0(MultiFDRecvData, 1); > + p->data->size = 0; > + > if (use_packets) { > p->packet_len = sizeof(MultiFDPacket_t) > + sizeof(uint64_t) * page_count; > diff --git a/migration/multifd.h b/migration/multifd.h > index 406d42dbae..abaf16c3f2 100644 > --- a/migration/multifd.h > +++ b/migration/multifd.h > @@ -13,6 +13,8 @@ > #ifndef QEMU_MIGRATION_MULTIFD_H > #define QEMU_MIGRATION_MULTIFD_H > > +typedef struct MultiFDRecvData MultiFDRecvData; > + > int multifd_save_setup(Error **errp); > void multifd_save_cleanup(void); > int multifd_load_setup(Error **errp); > @@ -24,6 +26,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error > **errp); > void multifd_recv_sync_main(void); > int multifd_send_sync_main(QEMUFile *f); > int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); > +int multifd_recv(void); > +MultiFDRecvData *multifd_get_recv_data(void); > > /* Multifd Compression flags */ > #define MULTIFD_FLAG_SYNC (1 << 0) > @@ -66,6 +70,13 @@ typedef struct { > RAMBlock *block; > } MultiFDPages_t; > > +struct MultiFDRecvData { > + void *opaque; > + size_t size; > + /* for preadv */ > + off_t file_offset; > +}; > + > typedef struct { > /* Fields are only written at creating/deletion time */ > /* No lock required for them, they are read only */ > @@ -156,6 +167,8 @@ typedef struct { > > /* syncs main thread and channels */ > QemuSemaphore sem_sync; > + /* sem where to wait for more work */ > + QemuSemaphore sem; > > /* this mutex protects the following parameters */ > QemuMutex mutex; > @@ -167,6 +180,13 @@ typedef struct { > uint32_t flags; > /* global number of generated multifd packets */ > uint64_t packet_num; > + int pending_job; > + /* > + * The owner of 'data' depends of 'pending_job' value: > + * pending_job == 0 -> migration_thread can use it. > + * pending_job != 0 -> multifd_channel can use it. > + */ > + MultiFDRecvData *data; Right after the main thread assigns a chunk of memory to load for a recv thread, the main thread job done, afaict. I don't see how a race could happen here. I'm not sure, but I _think_ if we rely on p->quite or something similar to quite all recv threads, then this can be dropped? > > /* thread local variables. No locking required */ > > -- > 2.35.3 > -- Peter Xu