From: Peter Xu
Multifd provide a threaded model for processing jobs. On sender side,
there can be two kinds of job: (1) a list of pages to send, or (2) a sync
request.
The sync request is a very special kind of job. It never contains a page
array, but only a multifd packet telling the dest side to synchronize with
sent pages.
Before this patch, both requests use the pending_job field, no matter what
the request is, it will boost pending_job, while multifd sender thread will
decrement it after it finishes one job.
However this should be racy, because SYNC is special in that it needs to
set p->flags with MULTIFD_FLAG_SYNC, showing that this is a sync request.
Consider a sequence of operations where:
- migration thread enqueue a job to send some pages, pending_job++ (0->1)
- [...before the selected multifd sender thread wakes up...]
- migration thread enqueue another job to sync, pending_job++ (1->2),
setup p->flags=MULTIFD_FLAG_SYNC
- multifd sender thread wakes up, found pending_job==2
- send the 1st packet with MULTIFD_FLAG_SYNC and list of pages
- send the 2nd packet with flags==0 and no pages
This is not expected, because MULTIFD_FLAG_SYNC should hopefully be done
after all the pages are received. Meanwhile, the 2nd packet will be
completely useless, which contains zero information.
I didn't verify above, but I think this issue is still benign in that at
least on the recv side we always receive pages before handling
MULTIFD_FLAG_SYNC. However that's not always guaranteed and just tricky.
One other reason I want to separate it is using p->flags to communicate
between the two threads is also not clearly defined, it's very hard to read
and understand why accessing p->flags is always safe; see the current impl
of multifd_send_thread() where we tried to cache only p->flags. It doesn't
need to be that complicated.
This patch introduces pending_sync, a separate flag just to show that the
requester needs a sync. Alongside, we remove the tricky caching of
p->flags now because after this patch p->flags should only be used by
multifd sender thread now, which will be crystal clear. So it is always
thread safe to access p->flags.
With that, we can also safely convert the pending_job into a boolean,
because we don't support >1 pending jobs anyway.
Signed-off-by: Peter Xu
---
migration/multifd.h | 13 +++--
migration/multifd.c | 29 ++---
2 files changed, 29 insertions(+), 13 deletions(-)
diff --git a/migration/multifd.h b/migration/multifd.h
index 3920bdbcf1..08f26ef3fe 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -99,8 +99,17 @@ typedef struct {
uint32_t flags;
/* global number of generated multifd packets */
uint64_t packet_num;
-/* thread has work to do */
-int pending_job;
+/*
+ * The sender thread has work to do if either of below boolean is set.
+ *
+ * @pending_job: a job is pending
+ * @pending_sync: a sync request is pending
+ *
+ * For both of these fields, they're only set by the requesters, and
+ * cleared by the multifd sender threads.
+ */
+bool pending_job;
+bool pending_sync;
/* array of pages to sent.
* The owner of 'pages' depends of 'pending_job' value:
* pending_job == 0 -> migration_thread can use it.
diff --git a/migration/multifd.c b/migration/multifd.c
index 8bb1fd95cf..6a4863edd2 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -443,7 +443,7 @@ static int multifd_send_pages(void)
p = _send_state->params[i];
qemu_mutex_lock(>mutex);
if (!p->pending_job) {
-p->pending_job++;
+p->pending_job = true;
next_channel = (i + 1) % migrate_multifd_channels();
break;
}
@@ -631,8 +631,7 @@ int multifd_send_sync_main(void)
qemu_mutex_lock(>mutex);
p->packet_num = multifd_send_state->packet_num++;
-p->flags |= MULTIFD_FLAG_SYNC;
-p->pending_job++;
+p->pending_sync = true;
qemu_mutex_unlock(>mutex);
qemu_sem_post(>sem);
}
@@ -688,7 +687,6 @@ static void *multifd_send_thread(void *opaque)
if (p->pending_job) {
uint64_t packet_num = p->packet_num;
MultiFDPages_t *pages = p->pages;
-uint32_t flags;
if (use_zero_copy_send) {
p->iovs_num = 0;
@@ -704,13 +702,11 @@ static void *multifd_send_thread(void *opaque)
}
}
multifd_send_fill_packet(p);
-flags = p->flags;
-p->flags = 0;
p->num_packets++;
p->total_normal_pages += pages->num;
qemu_mutex_unlock(>mutex);
-trace_multifd_send(p->id, packet_num, pages->num, flags,
+trace_multifd_send(p->id, packet_num, pages->num, p->flags,
p->next_packet_size);
if (use_zero_copy_send) {