"Dr. David Alan Gilbert" <dgilb...@redhat.com> wrote: > * Juan Quintela (quint...@redhat.com) wrote: >> We synchronize all threads each RAM_SAVE_FLAG_EOS. Bitmap >> synchronizations don't happen inside a ram section, so we are safe >> about two channels trying to overwrite the same memory. > > OK, that's quite neat - so you don't need any extra flags in the stream > to do the sync; it probably needs a comment in the code somewhere so we > don't forget!
Thanks. >> Signed-off-by: Juan Quintela <quint...@redhat.com> >> --- >> migration/ram.c | 118 +++++++++++++++++++++++++++++++++++++---- >> migration/trace-events | 6 +++ >> 2 files changed, 113 insertions(+), 11 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index c4c185cc4c..398cb0af3b 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -405,6 +405,8 @@ static void compress_threads_save_setup(void) >> #define MULTIFD_MAGIC 0x11223344U >> #define MULTIFD_VERSION 1 >> >> +#define MULTIFD_FLAG_SYNC (1 << 0) >> + >> typedef struct { >> uint32_t magic; >> uint32_t version; >> @@ -471,6 +473,8 @@ typedef struct { >> uint32_t num_packets; >> /* pages sent through this channel */ >> uint32_t num_pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> } MultiFDSendParams; >> >> typedef struct { >> @@ -507,6 +511,8 @@ typedef struct { >> uint32_t num_packets; >> /* pages sent through this channel */ >> uint32_t num_pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> } MultiFDRecvParams; >> >> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) >> @@ -682,6 +688,10 @@ struct { >> int count; >> /* array of pages to sent */ >> MultiFDPages_t *pages; >> + /* syncs main thread and channels */ >> + QemuSemaphore sem_sync; >> + /* global number of generated multifd packets */ >> + uint32_t seq; > > It's interesting you use the same comment for 'seq' in > MultiFDSendParams - but I guess that means only this one is the global > version and the others aren't really global number - they're just > local to that thread? Only place that "increases/generates" seq is multifd_send_pages(), that is what creates a new packet to be sent. So, if we see _any_ packet on the wire, we know the real global ordering. They are only used for traces, to se that packet 42 was sent through channel 3, and on reception you check that packet 42 is what you received through channel 3. They only appears on traces, but I find they useful for debugging synchcronization errors. >> + for (i = 0; i < migrate_multifd_channels(); i++) { >> + MultiFDSendParams *p = &multifd_send_state->params[i]; >> + >> + trace_multifd_send_sync_main_signal(p->id); >> + >> + qemu_mutex_lock(&p->mutex); >> + p->flags |= MULTIFD_FLAG_SYNC; >> + p->pending_job++; >> + qemu_mutex_unlock(&p->mutex); >> + qemu_sem_post(&p->sem); >> + } [1] >> + for (i = 0; i < migrate_multifd_channels(); i++) { >> + MultiFDSendParams *p = &multifd_send_state->params[i]; >> + >> + trace_multifd_send_sync_main_wait(p->id); >> + qemu_sem_wait(&multifd_send_state->sem_sync); >> + } [2] >> + trace_multifd_send_sync_main(multifd_send_state->seq); >> +} >> + > > OK, so this just makes each of the sending threads ack, so that seems > OK. > But what happens with an error? multifd_send_sync_main exits it's > loop with a 'break' if the writes fail, and that could mean they never > come and post the flag-sync sem. Let's see. [1]: we are just doing mutex_lock/sem_post(), if we are not able to do that, we have got a big race that needs to be fixed. So that bit is ok. [2]: We do an unconditional sem_wait(). Looking at the worker code. In this patch level, we are ok, but I agree with you than in later patches, we need to also do the post on the error case. Changing. >> + >> + trace_multifd_recv_sync_main_wait(p->id); >> + qemu_sem_wait(&multifd_recv_state->sem_sync); >> + qemu_mutex_lock(&p->mutex); >> + if (multifd_recv_state->seq < p->seq) { >> + multifd_recv_state->seq = p->seq; >> + } > > Can you explain what this is for? > Something like the latest received block? When we are at a synhronization point, we don't know on the main thread when that synchronization happened (at what packet considered as a logical list of packages). So, we choose 'seq' from the channel with the highest number. That is the one that we want. We only use this for tracing, so we can "match" that we did a synchronization on the send side at packet N and we see the trace at reception side that we did it at packet N also. Remember than in a previous patch you asked me what happened if this does a wark around? At that point nothing. But now I need to change this code to be. multifd_recv_state->seq = 0; for (i = 0; i < migrate_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; ... if (multifd_recv_state->seq < p->seq) { multifd_recv_state->seq = p->seq; } And I have fixed the workaround problem, no? >> @@ -933,9 +1019,8 @@ static void *multifd_recv_thread(void *opaque) >> trace_multifd_recv_thread_start(p->id); >> >> while (true) { >> - qemu_sem_wait(&p->sem); >> qemu_mutex_lock(&p->mutex); >> - if (p->pending_job) { >> + if (true || p->pending_job) { > > A TODO I guess??? Oops, that should be out. Fixed on next version. >> uint32_t used; >> uint32_t flags; >> qemu_mutex_unlock(&p->mutex); >> @@ -956,14 +1041,18 @@ static void *multifd_recv_thread(void *opaque) >> p->num_packets++; >> p->num_pages += used; >> qemu_mutex_unlock(&p->mutex); >> + >> + if (flags & MULTIFD_FLAG_SYNC) { >> + qemu_sem_post(&multifd_recv_state->sem_sync); >> + qemu_sem_wait(&p->sem_sync); >> + } > > Can you explain the receive side logic - I think this is waiting for all > receive threads to 'ack' - but how do we know that they've finished > receiving all data that was sent? Because they need to receive a packet with MULTIFD_FLAG_SYNC sent. And if they receive that flag, we know that is the last one of the sequence. synchrconization works like (2 channels to make things easy): main thread: we finish a RAM_SECTION; flush pending packets to one of the channels send packet with MULTIFD_FLAG_SYNC for all the channels wait unil all the channels have processesed the FLAG_SYNC At this point send the RAM_SECTION_EOS footer. worker1 worker 2 if there is a pending packet, send it if there is a pending packet, send it (notice that there can't be more than one ever) send a pacet with SYNC flag set send a pacet with SYNC flag set On recetpion side main thread receives RAM_SECTION_EOS footer wait for works to receive a sync worker1 worker1 process any pending packet(no sync) process any pending packet(no sync) process packet with SYNC process packet with SYNC post main thread post main thread now main thread can continue Notice that we don't care what happens first, receiving packet with SYNC in workeers or RAM_SECTION_EOS on main thread, all works as expected. Noticing how long took to explain this, I think that I am going to add this to migration documentation. Will wait for any question you had before adding it. Later, Juan.