The receiver does migration loop until the migration connection is lost. Then, it is started as a backup.
The receiver does not load vm state once a migration begins, instead, it perfetches one whole migration data into a buffer, then loads vm state from that buffer afterwards. Signed-off-by: Jules Wang <junqing.w...@cs2c.com.cn> --- include/migration/qemu-file.h | 1 + include/sysemu/sysemu.h | 1 + migration.c | 22 ++++-- savevm.c | 154 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 168 insertions(+), 10 deletions(-) diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h index 0f757fb..f01ff10 100644 --- a/include/migration/qemu-file.h +++ b/include/migration/qemu-file.h @@ -92,6 +92,7 @@ typedef struct QEMUFileOps { QEMURamHookFunc *after_ram_iterate; QEMURamHookFunc *hook_ram_load; QEMURamSaveFunc *save_page; + QEMUFileGetBufferFunc *get_prefetch_buffer; } QEMUFileOps; QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops); diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h index b1aa059..44f23d0 100644 --- a/include/sysemu/sysemu.h +++ b/include/sysemu/sysemu.h @@ -81,6 +81,7 @@ void qemu_savevm_state_complete(QEMUFile *f); void qemu_savevm_state_cancel(void); uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size); int qemu_loadvm_state(QEMUFile *f); +int qemu_loadvm_state_ft(QEMUFile *f); /* SLIRP */ void do_info_slirp(Monitor *mon); diff --git a/migration.c b/migration.c index d8a9b2d..9be22a4 100644 --- a/migration.c +++ b/migration.c @@ -19,6 +19,7 @@ #include "monitor/monitor.h" #include "migration/qemu-file.h" #include "sysemu/sysemu.h" +#include "sysemu/cpus.h" #include "block/block.h" #include "qemu/sockets.h" #include "migration/block.h" @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque) { QEMUFile *f = opaque; int ret; + int count = 0; - ret = qemu_loadvm_state(f); - qemu_fclose(f); - if (ret < 0) { - fprintf(stderr, "load of migration failed\n"); - exit(EXIT_FAILURE); + if (ft_enabled()) { + while (qemu_loadvm_state_ft(f) >= 0) { + count++; + DPRINTF("incoming count %d\r", count); + } + qemu_fclose(f); + fprintf(stderr, "ft connection lost, launching self..\n"); + } else { + ret = qemu_loadvm_state(f); + qemu_fclose(f); + if (ret < 0) { + fprintf(stderr, "load of migration failed\n"); + exit(EXIT_FAILURE); + } } + cpu_synchronize_all_post_init(); qemu_announce_self(); DPRINTF("successfully loaded vm state\n"); diff --git a/savevm.c b/savevm.c index 6daf690..d5bf153 100644 --- a/savevm.c +++ b/savevm.c @@ -52,6 +52,8 @@ #define ARP_PTYPE_IP 0x0800 #define ARP_OP_REQUEST_REV 0x3 +#define PFB_SIZE 0x010000 + static int announce_self_create(uint8_t *buf, uint8_t *mac_addr) { @@ -135,6 +137,10 @@ struct QEMUFile { unsigned int iovcnt; int last_error; + + uint8_t *pfb; /* pfb -> PerFetch Buffer */ + uint64_t pfb_index; + uint64_t pfb_size; }; typedef struct QEMUFileStdio @@ -193,6 +199,25 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, int64_t pos, int size) return len; } +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf, + int64_t pos, int size) +{ + QEMUFile *f = opaque; + + if (f->pfb_size - pos <= 0) { + return 0; + } + + if (f->pfb_size - pos < size) { + size = f->pfb_size - pos; + } + + memcpy(buf, f->pfb+pos, size); + + return size; +} + + static int socket_close(void *opaque) { QEMUFileSocket *s = opaque; @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode) static const QEMUFileOps socket_read_ops = { .get_fd = socket_get_fd, .get_buffer = socket_get_buffer, + .get_prefetch_buffer = socket_get_prefetch_buffer, .close = socket_close }; @@ -493,7 +519,7 @@ QEMUFile *qemu_fopen(const char *filename, const char *mode) s->stdio_file = fopen(filename, mode); if (!s->stdio_file) goto fail; - + if(mode[0] == 'w') { s->file = qemu_fopen_ops(s, &stdio_file_write_ops); } else { @@ -739,6 +765,11 @@ int qemu_fclose(QEMUFile *f) if (f->last_error) { ret = f->last_error; } + + if (f->pfb) { + g_free(f->pfb); + } + g_free(f); return ret; } @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v) static void qemu_file_skip(QEMUFile *f, int size) { + if (f->pfb_index + size <= f->pfb_size) { + f->pfb_index += size; + return; + } else { + size -= f->pfb_size - f->pfb_index; + f->pfb_index = f->pfb_size; + } + if (f->buf_index + size <= f->buf_size) { f->buf_index += size; } @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, int size, size_t offset) { int pending; int index; + int done; + + if (f->ops->get_prefetch_buffer) { + if (f->pfb_index + offset < f->pfb_size) { + done = f->ops->get_prefetch_buffer(f, buf, f->pfb_index + offset, + size); + if (done == size) { + return size; + } + size -= done; + buf += done; + } else { + offset -= f->pfb_size - f->pfb_index; + } + } assert(!qemu_file_is_writable(f)); @@ -875,7 +929,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size) static int qemu_peek_byte(QEMUFile *f, int offset) { - int index = f->buf_index + offset; + int index; + + if (f->pfb_index + offset < f->pfb_size) { + return f->pfb[f->pfb_index + offset]; + } else { + offset -= f->pfb_size - f->pfb_index; + } + + index = f->buf_index + offset; assert(!qemu_file_is_writable(f)); @@ -1851,7 +1913,7 @@ void qemu_savevm_state_begin(QEMUFile *f, } se->ops->set_params(params, se->opaque); } - + qemu_put_be32(f, QEMU_VM_FILE_MAGIC); qemu_put_be32(f, QEMU_VM_FILE_VERSION); @@ -2294,8 +2356,6 @@ int qemu_loadvm_state(QEMUFile *f) } } - cpu_synchronize_all_post_init(); - ret = 0; out: @@ -2311,6 +2371,89 @@ out: return ret; } +int qemu_loadvm_state_ft(QEMUFile *f) +{ + int ret = 0; + int i = 0; + int j = 0; + int done = 0; + uint64_t size = 0; + uint64_t count = 0; + uint8_t *pfb = NULL; + uint8_t *buf = NULL; + + uint64_t max_mem = last_ram_offset() * 1.5; + + if (!f->ops->get_prefetch_buffer) { + fprintf(stderr, "Fault tolerant is not supported by this protocol.\n"); + return EINVAL; + } + + size = PFB_SIZE; + pfb = g_malloc(size); + + while (true) { + if (count + TARGET_PAGE_SIZE >= size) { + if (size*2 > max_mem) { + fprintf(stderr, "qemu_loadvm_state_ft: warning:" \ + "Prefetch buffer becomes too large.\n" \ + "Fault tolerant is unstable when you see this,\n" \ + "please increase the bandwidth or increase " \ + "the max down time.\n"); + break; + } + size = size * 2; + buf = g_try_realloc(pfb, size); + if (!buf) { + error_report("qemu_loadvm_state_ft: out of memory.\n"); + g_free(pfb); + return ENOMEM; + } + + pfb = buf; + } + + done = qemu_get_buffer(f, pfb + count, TARGET_PAGE_SIZE); + + ret = qemu_file_get_error(f); + if (ret != 0) { + g_free(pfb); + return ret; + } + + buf = pfb + count; + count += done; + for (i = 0; i < done; i++) { + if (buf[i] != 0xfe) { + continue; + } + if (buf[i-1] != 0xCa) { + continue; + } + if (buf[i-2] != 0xed) { + continue; + } + if (buf[i-3] == 0xFe) { + goto out; + } + } + } + out: + if (f->pfb) { + free(f->pfb); + } + f->pfb_size = count; + f->pfb_index = 0; + f->pfb = pfb; + + ret = qemu_loadvm_state(f); + + /* Skip magic number */ + qemu_get_be32(f); + + return ret; +} + static BlockDriverState *find_vmstate_bs(void) { BlockDriverState *bs = NULL; @@ -2419,6 +2562,7 @@ void do_savevm(Monitor *mon, const QDict *qdict) goto the_end; } ret = qemu_savevm_state(f); + cpu_synchronize_all_post_init(); vm_state_size = qemu_ftell(f); qemu_fclose(f); if (ret < 0) { -- 1.8.0.1