On 08/11/2011 05:32 PM, Umesh Deshpande wrote:
This patch creates a separate thread for the guest migration on the source side.
migrate_cancel request from the iothread is handled asynchronously. That is,
iothread submits migrate_cancel to the migration thread and returns, while the
migration thread attends this request at the next iteration to terminate its
execution.
Looks pretty good! I hope you agree. :) Just one note inside.
Signed-off-by: Umesh Deshpande<[email protected]>
---
buffered_file.c | 85 ++++++++++++++++++++++++++++++++----------------------
buffered_file.h | 4 ++
migration.c | 49 ++++++++++++++-----------------
migration.h | 6 ++++
4 files changed, 82 insertions(+), 62 deletions(-)
diff --git a/buffered_file.c b/buffered_file.c
index 41b42c3..19932b6 100644
--- a/buffered_file.c
+++ b/buffered_file.c
@@ -16,6 +16,8 @@
#include "qemu-timer.h"
#include "qemu-char.h"
#include "buffered_file.h"
+#include "migration.h"
+#include "qemu-thread.h"
//#define DEBUG_BUFFERED_FILE
@@ -28,13 +30,14 @@ typedef struct QEMUFileBuffered
void *opaque;
QEMUFile *file;
int has_error;
+ int closed;
int freeze_output;
size_t bytes_xfer;
size_t xfer_limit;
uint8_t *buffer;
size_t buffer_size;
size_t buffer_capacity;
- QEMUTimer *timer;
+ QemuThread thread;
} QEMUFileBuffered;
#ifdef DEBUG_BUFFERED_FILE
@@ -155,14 +158,6 @@ static int buffered_put_buffer(void *opaque, const uint8_t
*buf, int64_t pos, in
offset = size;
}
- if (pos == 0&& size == 0) {
- DPRINTF("file is ready\n");
- if (s->bytes_xfer<= s->xfer_limit) {
- DPRINTF("notifying client\n");
- s->put_ready(s->opaque);
- }
- }
-
return offset;
}
@@ -175,20 +170,20 @@ static int buffered_close(void *opaque)
while (!s->has_error&& s->buffer_size) {
buffered_flush(s);
- if (s->freeze_output)
+ if (s->freeze_output) {
s->wait_for_unfreeze(s);
+ }
}
This is racy; you might end up calling buffered_put_buffer twice from
two different threads.
- ret = s->close(s->opaque);
+ s->closed = 1;
- qemu_del_timer(s->timer);
- qemu_free_timer(s->timer);
+ ret = s->close(s->opaque);
qemu_free(s->buffer);
- qemu_free(s);
... similarly, here the migration thread might end up using the buffer.
Just set s->closed here and wait for thread completion; the migration
thread can handle the flushes free the buffer etc. Let the migration
thread do as much as possible, it will simplify your life.
return ret;
}
+
static int buffered_rate_limit(void *opaque)
{
QEMUFileBuffered *s = opaque;
@@ -228,34 +223,55 @@ static int64_t buffered_get_rate_limit(void *opaque)
return s->xfer_limit;
}
-static void buffered_rate_tick(void *opaque)
+static void *migrate_vm(void *opaque)
{
QEMUFileBuffered *s = opaque;
+ int64_t current_time, expire_time = qemu_get_clock_ms(rt_clock) + 100;
+ struct timeval tv = { .tv_sec = 0, .tv_usec = 100000};
- if (s->has_error) {
- buffered_close(s);
- return;
- }
+ qemu_mutex_lock_iothread();
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+ while (!s->closed) {
... This can be in fact
while (!s->closed || s->buffered_size)
and that alone will subsume the loop in buffered_close, no?
+ if (s->freeze_output) {
+ s->wait_for_unfreeze(s);
+ s->freeze_output = 0;
+ continue;
+ }
- if (s->freeze_output)
- return;
+ if (s->has_error) {
+ break;
+ }
+
+ current_time = qemu_get_clock_ms(rt_clock);
+ if (!s->closed&& (expire_time> current_time)) {
+ tv.tv_usec = 1000 * (expire_time - current_time);
+ select(0, NULL, NULL, NULL,&tv);
+ continue;
+ }
- s->bytes_xfer = 0;
+ s->bytes_xfer = 0;
+ buffered_flush(s);
- buffered_flush(s);
+ expire_time = qemu_get_clock_ms(rt_clock) + 100;
+ s->put_ready(s->opaque);
+ }
- /* Add some checks around this */
- s->put_ready(s->opaque);
+ if (s->has_error) {
+ buffered_close(s);
+ }
+ qemu_free(s);
+
+ qemu_mutex_unlock_iothread();
+
+ return NULL;
}
QEMUFile *qemu_fopen_ops_buffered(void *opaque,
- size_t bytes_per_sec,
- BufferedPutFunc *put_buffer,
- BufferedPutReadyFunc *put_ready,
- BufferedWaitForUnfreezeFunc
*wait_for_unfreeze,
- BufferedCloseFunc *close)
+ size_t bytes_per_sec,
+ BufferedPutFunc *put_buffer,
+ BufferedPutReadyFunc *put_ready,
+ BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
+ BufferedCloseFunc *close)
{
QEMUFileBuffered *s;
@@ -267,15 +283,14 @@ QEMUFile *qemu_fopen_ops_buffered(void *opaque,
s->put_ready = put_ready;
s->wait_for_unfreeze = wait_for_unfreeze;
s->close = close;
+ s->closed = 0;
s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
buffered_close, buffered_rate_limit,
buffered_set_rate_limit,
- buffered_get_rate_limit);
-
- s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
+ buffered_get_rate_limit);
- qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
+ qemu_thread_create(&s->thread, migrate_vm, s);
return s->file;
}
diff --git a/buffered_file.h b/buffered_file.h
index 98d358b..477bf7c 100644
--- a/buffered_file.h
+++ b/buffered_file.h
@@ -17,9 +17,13 @@
#include "hw/hw.h"
typedef ssize_t (BufferedPutFunc)(void *opaque, const void *data, size_t
size);
+typedef void (BufferedBeginFunc)(void *opaque);
Unused typedef.
typedef void (BufferedPutReadyFunc)(void *opaque);
typedef void (BufferedWaitForUnfreezeFunc)(void *opaque);
typedef int (BufferedCloseFunc)(void *opaque);
+typedef void (BufferedWaitForCancelFunc)(void *opaque);
+
+void wait_for_cancel(void *opaque);
BufferedWaitForCancelFunc should go in patch 2; wait_for_cancel is unused.
QEMUFile *qemu_fopen_ops_buffered(void *opaque, size_t xfer_limit,
BufferedPutFunc *put_buffer,
diff --git a/migration.c b/migration.c
index af3a1f2..d8a0abb 100644
--- a/migration.c
+++ b/migration.c
@@ -284,8 +284,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
{
int ret = 0;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
-
if (s->file) {
DPRINTF("closing file\n");
if (qemu_fclose(s->file) != 0) {
@@ -307,14 +305,6 @@ int migrate_fd_cleanup(FdMigrationState *s)
return ret;
}
-void migrate_fd_put_notify(void *opaque)
-{
- FdMigrationState *s = opaque;
-
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
- qemu_file_put_notify(s->file);
-}
-
qemu_file_put_notify is also unused now.
ssize_t migrate_fd_put_buffer(void *opaque, const void *data, size_t size)
{
FdMigrationState *s = opaque;
@@ -327,9 +317,7 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void
*data, size_t size)
if (ret == -1)
ret = -(s->get_error(s));
- if (ret == -EAGAIN) {
- qemu_set_fd_handler2(s->fd, NULL, NULL, migrate_fd_put_notify, s);
- } else if (ret< 0) {
+ if (ret< 0&& ret != -EAGAIN) {
if (s->mon) {
monitor_resume(s->mon);
}
@@ -342,36 +330,40 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void
*data, size_t size)
void migrate_fd_connect(FdMigrationState *s)
{
- int ret;
-
+ s->begin = 1;
s->file = qemu_fopen_ops_buffered(s,
s->bandwidth_limit,
migrate_fd_put_buffer,
migrate_fd_put_ready,
migrate_fd_wait_for_unfreeze,
migrate_fd_close);
-
- DPRINTF("beginning savevm\n");
- ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
- s->mig_state.shared);
- if (ret< 0) {
- DPRINTF("failed, %d\n", ret);
- migrate_fd_error(s);
- return;
- }
-
- migrate_fd_put_ready(s);
}
void migrate_fd_put_ready(void *opaque)
{
FdMigrationState *s = opaque;
+ int ret;
if (s->state != MIG_STATE_ACTIVE) {
DPRINTF("put_ready returning because of non-active state\n");
+ if (s->state == MIG_STATE_CANCELLED) {
+ migrate_fd_terminate(s);
+ }
return;
}
+ if (s->begin) {
+ DPRINTF("beginning savevm\n");
+ ret = qemu_savevm_state_begin(s->mon, s->file, s->mig_state.blk,
+ s->mig_state.shared);
+ if (ret< 0) {
+ DPRINTF("failed, %d\n", ret);
+ migrate_fd_error(s);
+ return;
+ }
+ s->begin = 0;
+ }
+
DPRINTF("iterate\n");
if (qemu_savevm_state_iterate(s->mon, s->file) == 1) {
int state;
@@ -415,6 +407,10 @@ void migrate_fd_cancel(MigrationState *mig_state)
DPRINTF("cancelling migration\n");
s->state = MIG_STATE_CANCELLED;
+}
+
+void migrate_fd_terminate(FdMigrationState *s)
+{
notifier_list_notify(&migration_state_notifiers);
qemu_savevm_state_cancel(s->mon, s->file);
@@ -458,7 +454,6 @@ int migrate_fd_close(void *opaque)
{
FdMigrationState *s = opaque;
- qemu_set_fd_handler2(s->fd, NULL, NULL, NULL, NULL);
return s->close(s);
}
diff --git a/migration.h b/migration.h
index 050c56c..887f84c 100644
--- a/migration.h
+++ b/migration.h
@@ -45,9 +45,11 @@ struct FdMigrationState
int fd;
Monitor *mon;
int state;
+ int begin;
int (*get_error)(struct FdMigrationState*);
int (*close)(struct FdMigrationState*);
int (*write)(struct FdMigrationState*, const void *, size_t);
+ void (*callback)(void *);
void *opaque;
};
@@ -118,12 +120,16 @@ ssize_t migrate_fd_put_buffer(void *opaque, const void
*data, size_t size);
void migrate_fd_connect(FdMigrationState *s);
+void migrate_fd_begin(void *opaque);
+
void migrate_fd_put_ready(void *opaque);
int migrate_fd_get_status(MigrationState *mig_state);
void migrate_fd_cancel(MigrationState *mig_state);
+void migrate_fd_terminate(FdMigrationState *s);
+
void migrate_fd_release(MigrationState *mig_state);
void migrate_fd_wait_for_unfreeze(void *opaque);
--
To unsubscribe from this list: send the line "unsubscribe kvm" in
the body of a message to [email protected]
More majordomo info at http://vger.kernel.org/majordomo-info.html