From: Jiahui Cen <cenjia...@huawei.com>

Multifd assumes the migration thread IOChannel is always established before
the multifd IOChannels, but this assumption will be broken in many situations
like network packet loss.

For example:
Step1: Source (migration thread IOChannel)  --SYN-->  Destination
Step2: Source (migration thread IOChannel)  <--SYNACK  Destination
Step3: Source (migration thread IOChannel, lost) --ACK-->X  Destination
Step4: Source (multifd IOChannel) --SYN-->    Destination
Step5: Source (multifd IOChannel) <--SYNACK   Destination
Step6: Source (multifd IOChannel, ESTABLISHED) --ACK-->  Destination
Step7: Destination accepts multifd IOChannel
Step8: Source (migration thread IOChannel, ESTABLISHED) -ACK,DATA->  Destination
Step9: Destination accepts migration thread IOChannel

The above situation can be reproduced by creating a weak network environment,
such as "tc qdisc add dev eth0 root netem loss 50%". The wrong acception order
will cause magic check failure and thus lead to migration failure.

This patch fixes this issue by sending a migration IOChannel initial packet with
a unique id when using multifd migration. Since the multifd IOChannels will also
send initial packets, the destination can judge whether the processing IOChannel
belongs to multifd by checking the id in the initial packet. This mechanism can
ensure that different IOChannels will go to correct branches in our test.

Signed-off-by: Jiahui Cen <cenjia...@huawei.com>
Signed-off-by: Ying Fang <fangyi...@huawei.com>
---
 migration/channel.c   |  9 +++++++++
 migration/migration.c | 49 ++++++++++++++++++++++++-------------------------
 migration/migration.h |  3 +++
 migration/ram.c       | 38 +++++++++++++-------------------------
 migration/ram.h       |  3 ++-
 migration/socket.c    |  7 +++++++
 6 files changed, 58 insertions(+), 51 deletions(-)

diff --git a/migration/channel.c b/migration/channel.c
index 20e4c8e..7462181 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -82,6 +82,15 @@ void migration_channel_connect(MigrationState *s,
                 return;
             }
         } else {
+            if (migrate_use_multifd()) {
+                /* multifd migration cannot distinguish migration IOChannel
+                 * from multifd IOChannels, so we need to send an initial 
packet
+                 * to show it is migration IOChannel
+                 */
+                migration_send_initial_packet(ioc,
+                                              migrate_multifd_channels(),
+                                              &error);
+            }
             QEMUFile *f = qemu_fopen_channel_output(ioc);
 
             qemu_mutex_lock(&s->qemu_file_lock);
diff --git a/migration/migration.c b/migration/migration.c
index 3febd0f..3da2baf 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -521,12 +521,6 @@ static void migration_incoming_setup(QEMUFile *f)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
 
-    if (multifd_load_setup() != 0) {
-        /* We haven't been able to create multifd threads
-           nothing better to do */
-        exit(EXIT_FAILURE);
-    }
-
     if (!mis->from_src_file) {
         mis->from_src_file = f;
     }
@@ -584,36 +578,41 @@ void migration_fd_process_incoming(QEMUFile *f)
 void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
 {
     MigrationIncomingState *mis = migration_incoming_get_current();
-    bool start_migration;
-
-    if (!mis->from_src_file) {
-        /* The first connection (multifd may have multiple) */
-        QEMUFile *f = qemu_fopen_channel_input(ioc);
+    Error *local_err = NULL;
+    int id = 0;
 
-        /* If it's a recovery, we're done */
-        if (postcopy_try_recover(f)) {
-            return;
-        }
+    if (migrate_use_multifd()) {
+        id = migration_recv_initial_packet(ioc, &local_err);
+    }
+    if (!migrate_use_multifd() || id == migrate_multifd_channels()) {
+        if (!mis->from_src_file) {
+            /* The migration connection (multifd may have multiple) */
+            QEMUFile *f = qemu_fopen_channel_input(ioc);
 
-        migration_incoming_setup(f);
+            /* If it's a recovery, we're done */
+            if (postcopy_try_recover(f)) {
+                return;
+            }
 
-        /*
-         * Common migration only needs one channel, so we can start
-         * right now.  Multifd needs more than one channel, we wait.
-         */
-        start_migration = !migrate_use_multifd();
-    } else {
-        Error *local_err = NULL;
+            migration_incoming_setup(f);
+        }
+    } else if (id >= 0) {
         /* Multiple connections */
         assert(migrate_use_multifd());
-        start_migration = multifd_recv_new_channel(ioc, &local_err);
+        multifd_recv_new_channel(ioc, id, &local_err);
         if (local_err) {
             error_propagate(errp, local_err);
             return;
         }
+    } else {
+        /* Bad connections */
+        multifd_recv_terminate_threads(local_err);
+        error_propagate(errp, local_err);
+        return;
     }
 
-    if (start_migration) {
+    /* Once we have all the channels we need, we can start migration */
+    if (migration_has_all_channels()) {
         migration_incoming_process();
     }
 }
diff --git a/migration/migration.h b/migration/migration.h
index 4f2fe19..ba8caa4 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -339,4 +339,7 @@ int foreach_not_ignored_block(RAMBlockIterFunc func, void 
*opaque);
 void migration_make_urgent_request(void);
 void migration_consume_urgent_request(void);
 
+int migration_send_initial_packet(QIOChannel *c, uint8_t id, Error **errp);
+int migration_recv_initial_packet(QIOChannel *c, Error **errp);
+
 #endif
diff --git a/migration/ram.c b/migration/ram.c
index 24a8906..2220875 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -593,7 +593,7 @@ typedef struct {
     uint8_t id;
     uint8_t unused1[7];     /* Reserved for future use */
     uint64_t unused2[4];    /* Reserved for future use */
-} __attribute__((packed)) MultiFDInit_t;
+} __attribute__((packed)) MigrationInit_t;
 
 typedef struct {
     uint32_t magic;
@@ -702,26 +702,26 @@ typedef struct {
     QemuSemaphore sem_sync;
 } MultiFDRecvParams;
 
-static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
+int migration_send_initial_packet(QIOChannel *c, uint8_t id, Error **errp)
 {
-    MultiFDInit_t msg;
+    MigrationInit_t msg;
     int ret;
 
     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
     msg.version = cpu_to_be32(MULTIFD_VERSION);
-    msg.id = p->id;
+    msg.id = id;
     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 
-    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    ret = qio_channel_write_all(c, (char *)&msg, sizeof(msg), errp);
     if (ret != 0) {
         return -1;
     }
     return 0;
 }
 
-static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
+int migration_recv_initial_packet(QIOChannel *c, Error **errp)
 {
-    MultiFDInit_t msg;
+    MigrationInit_t msg;
     int ret;
 
     ret = qio_channel_read_all(c, (char *)&msg, sizeof(msg), errp);
@@ -756,8 +756,8 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error 
**errp)
     }
 
     if (msg.id > migrate_multifd_channels()) {
-        error_setg(errp, "multifd: received channel version %d "
-                   "expected %d", msg.version, MULTIFD_VERSION);
+        error_setg(errp, "multifd: received channel id %d "
+                   "expected [0-%d]", msg.id, migrate_multifd_channels());
         return -1;
     }
 
@@ -1112,7 +1112,7 @@ static void *multifd_send_thread(void *opaque)
     trace_multifd_send_thread_start(p->id);
     rcu_register_thread();
 
-    if (multifd_send_initial_packet(p, &local_err) < 0) {
+    if (migration_send_initial_packet(p->c, p->id, &local_err) < 0) {
         ret = -1;
         goto out;
     }
@@ -1259,7 +1259,7 @@ struct {
     uint64_t packet_num;
 } *multifd_recv_state;
 
-static void multifd_recv_terminate_threads(Error *err)
+void multifd_recv_terminate_threads(Error *err)
 {
     int i;
 
@@ -1480,21 +1480,11 @@ bool multifd_recv_all_channels_created(void)
  * - Return false and do not set @errp when correctly receiving the current 
one;
  * - Return false and set @errp when failing to receive the current channel.
  */
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
+void multifd_recv_new_channel(QIOChannel *ioc, int id, Error **errp)
 {
     MultiFDRecvParams *p;
     Error *local_err = NULL;
-    int id;
 
-    id = multifd_recv_initial_packet(ioc, &local_err);
-    if (id < 0) {
-        multifd_recv_terminate_threads(local_err);
-        error_propagate_prepend(errp, local_err,
-                                "failed to receive packet"
-                                " via multifd channel %d: ",
-                                atomic_read(&multifd_recv_state->count));
-        return false;
-    }
     trace_multifd_recv_new_channel(id);
 
     p = &multifd_recv_state->params[id];
@@ -1503,7 +1493,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
                    id);
         multifd_recv_terminate_threads(local_err);
         error_propagate(errp, local_err);
-        return false;
+        return;
     }
     p->c = ioc;
     object_ref(OBJECT(ioc));
@@ -1514,8 +1504,6 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error 
**errp)
     qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
                        QEMU_THREAD_JOINABLE);
     atomic_inc(&multifd_recv_state->count);
-    return atomic_read(&multifd_recv_state->count) ==
-           migrate_multifd_channels();
 }
 
 /**
diff --git a/migration/ram.h b/migration/ram.h
index bd0eee7..a788ff0 100644
--- a/migration/ram.h
+++ b/migration/ram.h
@@ -46,7 +46,8 @@ void multifd_save_cleanup(void);
 int multifd_load_setup(void);
 int multifd_load_cleanup(Error **errp);
 bool multifd_recv_all_channels_created(void);
-bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
+void multifd_recv_new_channel(QIOChannel *ioc, int id, Error **errp);
+void multifd_recv_terminate_threads(Error *err);
 
 uint64_t ram_pagesize_summary(void);
 int ram_save_queue_pages(const char *rbname, ram_addr_t start, ram_addr_t len);
diff --git a/migration/socket.c b/migration/socket.c
index 97c9efd..f3bd80d 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -22,6 +22,7 @@
 #include "channel.h"
 #include "socket.h"
 #include "migration.h"
+#include "ram.h"
 #include "qemu-file.h"
 #include "io/channel-socket.h"
 #include "io/net-listener.h"
@@ -186,6 +187,12 @@ static void socket_start_incoming_migration(SocketAddress 
*saddr,
         num = migrate_multifd_channels();
     }
 
+    if (multifd_load_setup() != 0) {
+        /* We haven't been able to create multifd threads
+           nothing better to do */
+        exit(EXIT_FAILURE);
+    }
+
     if (qio_net_listener_open_sync(listener, saddr, num, errp) < 0) {
         object_unref(OBJECT(listener));
         return;
-- 
1.8.3.1



Reply via email to