Peter Xu <pet...@redhat.com> writes: > On Tue, Jan 16, 2024 at 05:25:03PM -0300, Fabiano Rosas wrote: >> Peter Xu <pet...@redhat.com> writes: >> >> > On Mon, Nov 27, 2023 at 05:26:00PM -0300, Fabiano Rosas wrote: >> >> Currently multifd does not need to have knowledge of pages on the >> >> receiving side because all the information needed is within the >> >> packets that come in the stream. >> >> >> >> We're about to add support to fixed-ram migration, which cannot use >> >> packets because it expects the ramblock section in the migration file >> >> to contain only the guest pages data. >> >> >> >> Add a data structure to transfer pages between the ram migration code >> >> and the multifd receiving threads. >> >> >> >> We don't want to reuse MultiFDPages_t for two reasons: >> >> >> >> a) multifd threads don't really need to know about the data they're >> >> receiving. >> >> >> >> b) the receiving side has to be stopped to load the pages, which means >> >> we can experiment with larger granularities than page size when >> >> transferring data. >> >> >> >> Signed-off-by: Fabiano Rosas <faro...@suse.de> >> >> --- >> >> - stopped using MultiFDPages_t and added a new structure which can >> >> take offset + size >> >> --- >> >> migration/multifd.c | 122 ++++++++++++++++++++++++++++++++++++++++++-- >> >> migration/multifd.h | 20 ++++++++ >> >> 2 files changed, 138 insertions(+), 4 deletions(-) >> >> >> >> diff --git a/migration/multifd.c b/migration/multifd.c >> >> index c1381bdc21..7dfab2367a 100644 >> >> --- a/migration/multifd.c >> >> +++ b/migration/multifd.c >> >> @@ -142,17 +142,36 @@ static void nocomp_recv_cleanup(MultiFDRecvParams >> >> *p) >> >> static int nocomp_recv_data(MultiFDRecvParams *p, Error **errp) >> >> { >> >> uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK; >> >> + ERRP_GUARD(); >> >> >> >> if (flags != MULTIFD_FLAG_NOCOMP) { >> >> error_setg(errp, "multifd %u: flags received %x flags expected >> >> %x", >> >> p->id, flags, MULTIFD_FLAG_NOCOMP); >> >> return -1; >> >> } >> >> - for (int i = 0; i < p->normal_num; i++) { >> >> - p->iov[i].iov_base = p->host + p->normal[i]; >> >> - p->iov[i].iov_len = p->page_size; >> >> + >> >> + if (!migrate_multifd_packets()) { >> >> + MultiFDRecvData *data = p->data; >> >> + size_t ret; >> >> + >> >> + ret = qio_channel_pread(p->c, (char *) data->opaque, >> >> + data->size, data->file_offset, errp); >> >> + if (ret != data->size) { >> >> + error_prepend(errp, >> >> + "multifd recv (%u): read 0x%zx, expected >> >> 0x%zx", >> >> + p->id, ret, data->size); >> >> + return -1; >> >> + } >> >> + >> >> + return 0; >> >> + } else { >> >> + for (int i = 0; i < p->normal_num; i++) { >> >> + p->iov[i].iov_base = p->host + p->normal[i]; >> >> + p->iov[i].iov_len = p->page_size; >> >> + } >> >> + >> >> + return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); >> >> } >> > >> > I guess you managed to squash the file loads into "no compression" handler >> > of multifd, but IMHO it's not as clean. >> > >> > Firstly, if to do so, we'd better make sure multifd-compression is not >> > enabled anywhere together with fixed-ram. I didn't yet see such protection >> > in the series. I think if it happens we should expect crashes because >> > they'll go into zlib/zstd paths for the file. >> >> Yes, we need some checks around this. >> >> > >> > IMHO the only model fixed-ram can share with multifd is the task management >> > part, mutexes, semaphores, etc.. >> >> AFAIU, that's what multifd *is*. Compression would just be another >> client of this task management code. This "nocomp" thing always felt off >> to me. >> >> > IIRC I used to mention that it'll be nice >> > if we have simply a pool of threads so we can enqueue tasks. >> >> Right, I don't disagree. However I don't think we can just show up with >> a thread pool and start moving stuff into it. I think the safest way to >> do this is to: >> >> 1- Adapt multifd so that the client code is the sole responsible for the >> data being sent. No data knowledge by the multifd thread. >> >> With this, nothing should need to touch multifd threads code >> anymore. New clients just define their methods and prepare the data >> as they please. >> >> 2- Move everything that is left into multifd. Zero pages, postcopy, etc. >> >> With 1 and 2 we'd have a pretty good picture of what kinds of operations >> we need to do and what are the requirements for the thread >> infrastructure. >> >> 3- Try to use existing abstractions within QEMU to replace >> multifd. Write from scratch if none are suitable. >> >> What do you think? We could put an action plan in place and start >> picking at it. My main concern is about what sorts of hidden assumptions >> are present in the current code that we'd start discovering if we just >> replaced it with something new. > > You plan sounds good. Generalization (3) can happen even before (2), IMHO. > > I suppose you already have the wiki account working now, would you please > add an entry into the todo page, with all these thoughts? > > https://wiki.qemu.org/ToDo/LiveMigration > > You can also mention you plan to look into it if you're taking the lead, > then people know it's in progress. > > It can be under "cleanups" I assume. > >> >> > If that's too >> > far away, would something like below closer to that? What I'm thinking: >> > >> > - patch 1: rename MultiFDMethods to MultiFDCompressMethods, this can >> > replace the other patch to do s/recv_pages/recv_data/ >> > >> > - patch 2: introduce MultiFDMethods (on top of MultiFDCompressMethods), >> > refactor the current code to provide the socket version of >> > MultiFDMethods. >> > >> > - patch 3: add the fixed-ram "file" version of MultiFDMethods >> >> We also have zero page moving to multifd and compression accelerators >> being developed. We need to take those into account. We might need an >> ops structure that accounts for the current "phases" (setup, prepare, >> recv, cleanup)[1], but within those also allow for composing arbitrary >> data transformations. >> >> (1)- there's no equivalent to send_prepare on dst and no equivalent to >> recv_pages on src. We might need to add a recv_prepare and a send_pages >> hook. The fixed-ram migration for instance would benefit from being able >> to choose a different IO function to send data. >> >> I'll send the patches moving zero pages to multifd once I find some >> time, but another question I had was whether we should add zero page >> detection as a new phase: setup - zero page detection - prepare - send - >> cleanup. > > As you know I haven't yet followed those threads, only a rough memory on > the zero page movement but that can be obsolete and I'll need to read what > you have. I agree all these are multifd-relevant, and we should consider > them. > > Now the question is even if we will have a good thread model for multifd, > whether file operations should be put into the compression layer (what you > already did in this patch) or move it out. Libvirt supports compression on > images so I assume file operations shouldn't need to rely on compression, > from that pov I think maybe it's better we leave all compression stuff > along from file: perspective. >
I'm thinking maybe divide multifd in phases (very similar to what we have), but fold compression into one or more of those phases. So we'd have: send_prepare: if compression: ops->compression_prepare() I'm working on something similar for zero pages. It's currently a 3 patch series from Juan, probably easier if we discuss there. >> > MultiFDCompressMethods doesn't need to be used at all for "file" version of >> > MultiFDMethods. >> > >> > Would that work? >> >> We definitely need _something_ to help us stop adding code to the middle >> of multifd_send_thread every time there's a new feature. >> >> > >> >> - return qio_channel_readv_all(p->c, p->iov, p->normal_num, errp); >> >> } >> >> >> >> static MultiFDMethods multifd_nocomp_ops = { >> >> @@ -989,6 +1008,7 @@ int multifd_save_setup(Error **errp) >> >> >> >> struct { >> >> MultiFDRecvParams *params; >> >> + MultiFDRecvData *data; >> > >> > (If above would work, maybe we can split MultiFDRecvParams into two chunks, >> > one commonly used for both, one only for sockets?) >> > >> >> If we assume the use of packets in multifd is coupled to the socket >> channel usage then yes. However, I suspect that what we might want is a >> streaming migration vs. non-streaming migration abstraction. Because we >> can still use packets with file migration after all. >> >> >> /* number of created threads */ >> >> int count; >> >> /* syncs main thread and channels */ >> >> @@ -999,6 +1019,49 @@ struct { >> >> MultiFDMethods *ops; >> >> } *multifd_recv_state; >> >> >> >> +int multifd_recv(void) >> >> +{ >> >> + int i; >> >> + static int next_recv_channel; >> >> + MultiFDRecvParams *p = NULL; >> >> + MultiFDRecvData *data = multifd_recv_state->data; >> >> + >> >> + /* >> >> + * next_channel can remain from a previous migration that was >> >> + * using more channels, so ensure it doesn't overflow if the >> >> + * limit is lower now. >> >> + */ >> >> + next_recv_channel %= migrate_multifd_channels(); >> >> + for (i = next_recv_channel;; i = (i + 1) % >> >> migrate_multifd_channels()) { >> >> + p = &multifd_recv_state->params[i]; >> >> + >> >> + qemu_mutex_lock(&p->mutex); >> >> + if (p->quit) { >> >> + error_report("%s: channel %d has already quit!", __func__, >> >> i); >> >> + qemu_mutex_unlock(&p->mutex); >> >> + return -1; >> >> + } >> >> + if (!p->pending_job) { >> >> + p->pending_job++; >> >> + next_recv_channel = (i + 1) % migrate_multifd_channels(); >> >> + break; >> >> + } >> >> + qemu_mutex_unlock(&p->mutex); >> >> + } >> >> + assert(p->data->size == 0); >> >> + multifd_recv_state->data = p->data; >> >> + p->data = data; >> >> + qemu_mutex_unlock(&p->mutex); >> >> + qemu_sem_post(&p->sem); >> >> + >> >> + return 1; >> >> +} >> > >> > PS: so if we have the pool model we can already mostly merge above code >> > with multifd_send_pages().. because this will be a common helper to enqueue >> > a task to a pool, no matter it's for writting (to file/socket) or reading >> > (only from file). >> > >> >> + >> >> +MultiFDRecvData *multifd_get_recv_data(void) >> >> +{ >> >> + return multifd_recv_state->data; >> >> +} >> >> + >> >> static void multifd_recv_terminate_threads(Error *err) >> >> { >> >> int i; >> >> @@ -1020,6 +1083,7 @@ static void multifd_recv_terminate_threads(Error >> >> *err) >> >> >> >> qemu_mutex_lock(&p->mutex); >> >> p->quit = true; >> >> + qemu_sem_post(&p->sem); >> >> /* >> >> * We could arrive here for two reasons: >> >> * - normal quit, i.e. everything went fine, just finished >> >> @@ -1069,6 +1133,7 @@ void multifd_load_cleanup(void) >> >> p->c = NULL; >> >> qemu_mutex_destroy(&p->mutex); >> >> qemu_sem_destroy(&p->sem_sync); >> >> + qemu_sem_destroy(&p->sem); >> >> g_free(p->name); >> >> p->name = NULL; >> >> p->packet_len = 0; >> >> @@ -1083,6 +1148,8 @@ void multifd_load_cleanup(void) >> >> qemu_sem_destroy(&multifd_recv_state->sem_sync); >> >> g_free(multifd_recv_state->params); >> >> multifd_recv_state->params = NULL; >> >> + g_free(multifd_recv_state->data); >> >> + multifd_recv_state->data = NULL; >> >> g_free(multifd_recv_state); >> >> multifd_recv_state = NULL; >> >> } >> >> @@ -1094,6 +1161,21 @@ void multifd_recv_sync_main(void) >> >> if (!migrate_multifd() || !migrate_multifd_packets()) { >> > >> > [1] >> > >> >> return; >> >> } >> >> + >> >> + if (!migrate_multifd_packets()) { >> > >> > Hmm, isn't this checked already above at [1]? Could this path ever trigger >> > then? Maybe we need to drop the one at [1]? >> >> That was a rebase mistake. >> >> > >> > IIUC what you wanted to do here is relying on the last RAM_SAVE_FLAG_EOS in >> > the image file to do a full flush to make sure all pages are loaded. >> > >> >> Bear with me if I take it slow with everything below here. The practical >> effect of changing any of these is causing the threads to go off-sync >> and that results in a time dependent bug where memory is not properly >> migrated, which just comes up as an assert in check_guests_ram() in the >> tests. It gets hard to reproduce and has taken me whole weeks to debug >> before. > > Per-iteration sync_main is needed for file, but only on sender side, IIUC. > > Actually it's not needed for your use case at all to move the VM to file, > because in that case we could already stop the VM first. Now to be > compatible with Libvirt's sake on live snapshot on Windows, we assume VM > can run, then yes sync_main is needed at least on src, because otherwise > the same page can be queued >1 times on different threads, and it's not > guaranteed that the latest page will always land last. > > Said that, I don't think it's needed on recver side? Because for both use > cases (either "move to file", or "take a snapshot"), the loader is actually > the same process where we read data from the file and relaunch the VM. In > that case there's no need to sync. You're right. > For socket-based multifd, the sync_main on recver side is only triggered > when RAM_SAVE_FLAG_MULTIFD_FLUSH packet is received on 9.0 machine type. > And then you'll also notice you don't even have that for file: URI multifd > migrations, isn't it? Right, because socket migration is a stream, so if done live they'll need to sync. File migration takes everything from the file as is. > When you said you hit a bug, did you have the sender side sync_main > available, or you missed both? I would expect that bug triggered because > you missed the sync_main on src, not on dest. For dest, IMHO we only need > a last phase sync to make sure all RAM loaded before we relaunch the VM. You might be right. I hit several different bugs, synchronization issues, dirty bitmap issues, threads finishing too soon, etc. They all have the same symptom: one or more pages get "corrupted". So I cannot speak about any specific one right now. On that topic, I have played a bit with adding more information to the file such as start and end of ramblocks or some greppable markers. Since it is a file, it's way more prone to these kinds of debugging helpers than the socket stream. > >> >> > You may want to be careful on the side effect of flush_after_each_section >> > parameter: >> > >> > case RAM_SAVE_FLAG_EOS: >> > /* normal exit */ >> > if (migrate_multifd() && >> > migrate_multifd_flush_after_each_section()) { >> > multifd_recv_sync_main(); >> > } >> > >> > You may want to flush always for file? >> >> Next patch restricts the setting of flush_after_each_section. >> >> > >> >> + for (i = 0; i < migrate_multifd_channels(); i++) { >> >> + MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> + >> >> + qemu_sem_post(&p->sem); >> >> + qemu_sem_wait(&p->sem_sync); >> >> + >> >> + qemu_mutex_lock(&p->mutex); >> >> + assert(!p->pending_job || p->quit); >> >> + qemu_mutex_unlock(&p->mutex); >> >> + } >> >> + return; >> > >> > Btw, how does this kick off all the recv threads? Is it because you did a >> > sem_post(&sem) with p->pending_job==false this time? >> >> Yes, when the last piece of memory is received, the thread will loop >> around and hang at qemu_sem_wait(&p->sem): >> >> } else { >> /* >> * No packets, so we need to wait for the vmstate code to >> * give us work. >> */ >> qemu_sem_wait(&p->sem); >> qemu_mutex_lock(&p->mutex); >> if (!p->pending_job) { >> qemu_mutex_unlock(&p->mutex); >> break; >> } >> has_data = !!p->data->size; >> } >> >> So here we release the p->sem one last time so the thread can see >> p->pending_job = false and proceed to inform it has finished: >> >> if (!use_packets) { >> qemu_sem_post(&p->sem_sync); >> } > > (see below) > >> >> > Maybe it's clearer to just set p->quit (or a global quite knob) somewhere? >> > That'll be clear that this is a one-shot thing, only needed at the end of >> > the file incoming migration. >> > >> >> Maybe I'm not following you, but the thread needs to check >> p->pending_job before it knows there's no more work. And it can only do >> that if the migration thread releases p->sem. Do you mean setting >> p->quit on the thread instead of posting sem_sync? That's racy I think. > > I want to make the quit event not rely on pending_job. I think we should > allow pending_job==false and the thread should just sleep again. Hmm, this could be a good thing. Because then we'll have only one source of the "quit event" and we'd know all thread would wait until they see that quit. > That should also match recver side with sender, I remember we just reworked > that so as to reference to the global "quit" flag: Almost. The sender still has the ability of just exiting. So even though it respects the 'exiting' flag, it might have already returned at the end of the function. > > multifd_send_thread(): > ... > qemu_sem_wait(&p->sem); > if (qatomic_read(&multifd_send_state->exiting)) { > break; > } > ... > > Something like that. > >> >> >> + } >> >> + >> >> for (i = 0; i < migrate_multifd_channels(); i++) { >> >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> >> >> @@ -1156,6 +1238,18 @@ static void *multifd_recv_thread(void *opaque) >> >> >> >> p->total_normal_pages += p->normal_num; >> >> has_data = !!p->normal_num; >> >> + } else { >> >> + /* >> >> + * No packets, so we need to wait for the vmstate code to >> >> + * give us work. >> >> + */ >> >> + qemu_sem_wait(&p->sem); >> >> + qemu_mutex_lock(&p->mutex); >> >> + if (!p->pending_job) { >> >> + qemu_mutex_unlock(&p->mutex); >> >> + break; >> >> + } >> >> + has_data = !!p->data->size; >> >> } >> >> >> >> qemu_mutex_unlock(&p->mutex); >> >> @@ -1171,6 +1265,17 @@ static void *multifd_recv_thread(void *opaque) >> >> qemu_sem_post(&multifd_recv_state->sem_sync); >> >> qemu_sem_wait(&p->sem_sync); >> >> } >> >> + >> >> + if (!use_packets) { >> >> + qemu_mutex_lock(&p->mutex); >> >> + p->data->size = 0; >> >> + p->pending_job--; >> >> + qemu_mutex_unlock(&p->mutex); >> >> + } >> >> + } >> >> + >> >> + if (!use_packets) { >> >> + qemu_sem_post(&p->sem_sync); >> > >> > Currently sem_sync is only posted with MULTIFD_FLAG_SYNC flag. We'd better >> > be careful on reusing it. >> > >> > Maybe add some comment above recv_state->sem_sync? >> > >> > /* >> > * For sockets: this is posted once for each MULTIFD_FLAG_SYNC flag. >> > * >> > * For files: this is only posted at the end of the file load to mark >> > * completion of the load process. >> > */ >> > >> >> Sure. I would rename it to sem_done if I could, but we already went >> through that. >> >> >> } >> >> >> >> if (local_err) { >> >> @@ -1205,6 +1310,10 @@ int multifd_load_setup(Error **errp) >> >> thread_count = migrate_multifd_channels(); >> >> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); >> >> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); >> >> + >> >> + multifd_recv_state->data = g_new0(MultiFDRecvData, 1); >> >> + multifd_recv_state->data->size = 0; >> >> + >> >> qatomic_set(&multifd_recv_state->count, 0); >> >> qemu_sem_init(&multifd_recv_state->sem_sync, 0); >> >> multifd_recv_state->ops = multifd_ops[migrate_multifd_compression()]; >> >> @@ -1214,9 +1323,14 @@ int multifd_load_setup(Error **errp) >> >> >> >> qemu_mutex_init(&p->mutex); >> >> qemu_sem_init(&p->sem_sync, 0); >> >> + qemu_sem_init(&p->sem, 0); >> >> p->quit = false; >> >> + p->pending_job = 0; >> >> p->id = i; >> >> >> >> + p->data = g_new0(MultiFDRecvData, 1); >> >> + p->data->size = 0; >> >> + >> >> if (use_packets) { >> >> p->packet_len = sizeof(MultiFDPacket_t) >> >> + sizeof(uint64_t) * page_count; >> >> diff --git a/migration/multifd.h b/migration/multifd.h >> >> index 406d42dbae..abaf16c3f2 100644 >> >> --- a/migration/multifd.h >> >> +++ b/migration/multifd.h >> >> @@ -13,6 +13,8 @@ >> >> #ifndef QEMU_MIGRATION_MULTIFD_H >> >> #define QEMU_MIGRATION_MULTIFD_H >> >> >> >> +typedef struct MultiFDRecvData MultiFDRecvData; >> >> + >> >> int multifd_save_setup(Error **errp); >> >> void multifd_save_cleanup(void); >> >> int multifd_load_setup(Error **errp); >> >> @@ -24,6 +26,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error >> >> **errp); >> >> void multifd_recv_sync_main(void); >> >> int multifd_send_sync_main(QEMUFile *f); >> >> int multifd_queue_page(QEMUFile *f, RAMBlock *block, ram_addr_t offset); >> >> +int multifd_recv(void); >> >> +MultiFDRecvData *multifd_get_recv_data(void); >> >> >> >> /* Multifd Compression flags */ >> >> #define MULTIFD_FLAG_SYNC (1 << 0) >> >> @@ -66,6 +70,13 @@ typedef struct { >> >> RAMBlock *block; >> >> } MultiFDPages_t; >> >> >> >> +struct MultiFDRecvData { >> >> + void *opaque; >> >> + size_t size; >> >> + /* for preadv */ >> >> + off_t file_offset; >> >> +}; >> >> + >> >> typedef struct { >> >> /* Fields are only written at creating/deletion time */ >> >> /* No lock required for them, they are read only */ >> >> @@ -156,6 +167,8 @@ typedef struct { >> >> >> >> /* syncs main thread and channels */ >> >> QemuSemaphore sem_sync; >> >> + /* sem where to wait for more work */ >> >> + QemuSemaphore sem; >> >> >> >> /* this mutex protects the following parameters */ >> >> QemuMutex mutex; >> >> @@ -167,6 +180,13 @@ typedef struct { >> >> uint32_t flags; >> >> /* global number of generated multifd packets */ >> >> uint64_t packet_num; >> >> + int pending_job; >> >> + /* >> >> + * The owner of 'data' depends of 'pending_job' value: >> >> + * pending_job == 0 -> migration_thread can use it. >> >> + * pending_job != 0 -> multifd_channel can use it. >> >> + */ >> >> + MultiFDRecvData *data; >> > >> > Right after the main thread assigns a chunk of memory to load for a recv >> > thread, the main thread job done, afaict. I don't see how a race could >> > happen here. >> > >> > I'm not sure, but I _think_ if we rely on p->quite or something similar to >> > quite all recv threads, then this can be dropped? >> > >> >> We still need to know whether a channel is in use so we can skip to the >> next. > > Oh, yes. > >> >> >> >> >> /* thread local variables. No locking required */ >> >> >> >> -- >> >> 2.35.3 >> >> >>