From: Peter Xu <pet...@redhat.com>

The sender thread will yield the p->mutex before IO starts, trying to not
block the requester thread.  This may be unnecessary lock optimizations,
because the requester can already read pending_job safely even without the
lock, because the requester is currently the only one who can assign a
task.

Drop that lock complication on both sides:

  (1) in the sender thread, always take the mutex until job done
  (2) in the requester thread, check pending_job clear lockless

Reviewed-by: Fabiano Rosas <faro...@suse.de>
Link: https://lore.kernel.org/r/20240202102857.110210-8-pet...@redhat.com
Signed-off-by: Peter Xu <pet...@redhat.com>
---
 migration/multifd.c | 23 ++++++++++++++++-------
 1 file changed, 16 insertions(+), 7 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index ea25bbe6bd..4d5a01ed93 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -429,7 +429,9 @@ static int multifd_send_pages(void)
         return -1;
     }
 
+    /* We wait here, until at least one channel is ready */
     qemu_sem_wait(&multifd_send_state->channels_ready);
+
     /*
      * next_channel can remain from a previous migration that was
      * using more channels, so ensure it doesn't overflow if the
@@ -441,17 +443,26 @@ static int multifd_send_pages(void)
             return -1;
         }
         p = &multifd_send_state->params[i];
-        qemu_mutex_lock(&p->mutex);
+        /*
+         * Lockless read to p->pending_job is safe, because only multifd
+         * sender thread can clear it.
+         */
         if (qatomic_read(&p->pending_job) == false) {
-            qatomic_set(&p->pending_job, true);
             next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
-        qemu_mutex_unlock(&p->mutex);
     }
+
+    qemu_mutex_lock(&p->mutex);
     assert(!p->pages->num);
     assert(!p->pages->block);
-
+    /*
+     * Double check on pending_job==false with the lock.  In the future if
+     * we can have >1 requester thread, we can replace this with a "goto
+     * retry", but that is for later.
+     */
+    assert(qatomic_read(&p->pending_job) == false);
+    qatomic_set(&p->pending_job, true);
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
@@ -709,8 +720,6 @@ static void *multifd_send_thread(void *opaque)
             multifd_send_fill_packet(p);
             p->num_packets++;
             p->total_normal_pages += pages->num;
-            qemu_mutex_unlock(&p->mutex);
-
             trace_multifd_send(p->id, packet_num, pages->num, p->flags,
                                p->next_packet_size);
 
@@ -730,6 +739,7 @@ static void *multifd_send_thread(void *opaque)
             ret = qio_channel_writev_full_all(p->c, p->iov, p->iovs_num, NULL,
                                               0, p->write_flags, &local_err);
             if (ret != 0) {
+                qemu_mutex_unlock(&p->mutex);
                 break;
             }
 
@@ -738,7 +748,6 @@ static void *multifd_send_thread(void *opaque)
 
             multifd_pages_reset(p->pages);
             p->next_packet_size = 0;
-            qemu_mutex_lock(&p->mutex);
             qatomic_set(&p->pending_job, false);
             qemu_mutex_unlock(&p->mutex);
         } else if (qatomic_read(&p->pending_sync)) {
-- 
2.43.0


Reply via email to