Re: [Qemu-devel] [PATCH RFC 4/4] Curling: the receiver

2013-09-11 Thread junqing . wang
hi,


At 2013-09-10 22:19:48,Juan Quintela quint...@redhat.com wrote:

 @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque)
  {
  QEMUFile *f = opaque;
  int ret;
 +int count = 0;
  
 -ret = qemu_loadvm_state(f);
 -qemu_fclose(f);
 -if (ret  0) {
 -fprintf(stderr, load of migration failed\n);
 -exit(EXIT_FAILURE);
 +if (ft_enabled()) {
 +while (qemu_loadvm_state_ft(f) = 0) {
 +count++;
 +DPRINTF(incoming count %d\r, count);
 +}
 +qemu_fclose(f);
 +fprintf(stderr, ft connection lost, launching self..\n);

Obviously,  here we are needing something more that an fprintf,,  right?

We are not checking either if it is one error.

Agree.

 +} else {
 +ret = qemu_loadvm_state(f);
 +qemu_fclose(f);
 +if (ret  0) {
 +fprintf(stderr, load of migration failed\n);
 +exit(EXIT_FAILURE);
 +}
  }
 +cpu_synchronize_all_post_init();
  qemu_announce_self();
  DPRINTF(successfully loaded vm state\n);
  
 diff --git a/savevm.c b/savevm.c
 index 6daf690..d5bf153 100644
 --- a/savevm.c
 +++ b/savevm.c
 @@ -52,6 +52,8 @@
  #define ARP_PTYPE_IP 0x0800
  #define ARP_OP_REQUEST_REV 0x3
  
 +#define PFB_SIZE 0x01
 +
  static int announce_self_create(uint8_t *buf,
  uint8_t *mac_addr)
  {
 @@ -135,6 +137,10 @@ struct QEMUFile {
  unsigned int iovcnt;
  
  int last_error;
 +
 +uint8_t *pfb;   /* pfb - PerFetch Buffer */

s/PreFetch/Prefetcth/

prefetch_buffer as name?  not used in so many places,  makes things
clearer or more convoluted?  Other comments?


Agree.

 +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
 +  int64_t pos, int size)
 +{
 +QEMUFile *f = opaque;
 +
 +if (f-pfb_size - pos = 0) {
 +return 0;
 +}
 +
 +if (f-pfb_size - pos  size) {
 +size = f-pfb_size - pos;
 +}
 +
 +memcpy(buf, f-pfb+pos, size);
 +
 +return size;
 +}
 +
 +
  static int socket_close(void *opaque)
  {
  QEMUFileSocket *s = opaque;
 @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
  static const QEMUFileOps socket_read_ops = {
  .get_fd = socket_get_fd,
  .get_buffer = socket_get_buffer,
 +.get_prefetch_buffer = socket_get_prefetch_buffer,
  .close =  socket_close
  };
  

  if (f-last_error) {
  ret = f-last_error;
  }
 +
 +if (f-pfb) {
 +g_free(f-pfb);

g_free(f-pfb);
It already checks for NULL.

Got it.

 +}
 +
  g_free(f);
  return ret;
  }
 @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v)
  
  static void qemu_file_skip(QEMUFile *f, int size)
  {
 +if (f-pfb_index + size = f-pfb_size) {
 +f-pfb_index += size;
 +return;
 +} else {
 +size -= f-pfb_size - f-pfb_index;
 +f-pfb_index = f-pfb_size;
 +}
 +
  if (f-buf_index + size = f-buf_size) {
  f-buf_index += size;
  }
 @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, 
 int size, size_t offset)
  {
  int pending;
  int index;
 +int done;
 +
 +if (f-ops-get_prefetch_buffer) {
 +if (f-pfb_index + offset  f-pfb_size) {
 +done = f-ops-get_prefetch_buffer(f, buf, f-pfb_index + 
 offset,
 +   size);
 +if (done == size) {
 +return size;
 +}
 +size -= done;
 +buf  += done;
 +} else {
 +offset -= f-pfb_size - f-pfb_index;
 +}
 +}
  
  assert(!qemu_file_is_writable(f));
  
 @@ -875,7 +929,15 @@ int qemu_get_buffer(QEMUFile *f, uint8_t *buf, int size)
  
  static int qemu_peek_byte(QEMUFile *f, int offset)
  {
 -int index = f-buf_index + offset;
 +int index;
 +
 +if (f-pfb_index + offset  f-pfb_size) {
 +return f-pfb[f-pfb_index + offset];
 +} else {
 +offset -= f-pfb_size - f-pfb_index;
 +}
 +
 +index = f-buf_index + offset;
  
  assert(!qemu_file_is_writable(f));
  
 @@ -1851,7 +1913,7 @@ void qemu_savevm_state_begin(QEMUFile *f,
  }
  se-ops-set_params(params, se-opaque);
  }
 -
 +
  qemu_put_be32(f, QEMU_VM_FILE_MAGIC);
  qemu_put_be32(f, QEMU_VM_FILE_VERSION);
  
 @@ -2294,8 +2356,6 @@ int qemu_loadvm_state(QEMUFile *f)
  }
  }
  
 -cpu_synchronize_all_post_init();
 -
  ret = 0;
  
  out:
 @@ -2311,6 +2371,89 @@ out:
  return ret;
  }
  
 +int qemu_loadvm_state_ft(QEMUFile *f)
 +{
 +int ret = 0;
 +int i   = 0;
 +int j   = 0;
 +int done = 0;
 +uint64_t size = 0;
 +uint64_t count = 0;
 +uint8_t *pfb = NULL;
 +uint8_t *buf = NULL;
 +
 +uint64_t max_mem = last_ram_offset() * 1.5;
 +
 +if (!f-ops-get_prefetch_buffer) {
 +fprintf(stderr, Fault tolerant is not supported by 

Re: [Qemu-devel] [PATCH RFC 4/4] Curling: the receiver

2013-09-10 Thread Juan Quintela
Jules Wang junqing.w...@cs2c.com.cn wrote:
 The receiver does migration loop until the migration connection is
 lost. Then, it is started as a backup.

 The receiver does not load vm state once a migration begins,
 instead, it perfetches one whole migration data into a buffer,
 then loads vm state from that buffer afterwards.

 Signed-off-by: Jules Wang junqing.w...@cs2c.com.cn
 ---
  include/migration/qemu-file.h |   1 +
  include/sysemu/sysemu.h   |   1 +
  migration.c   |  22 --
  savevm.c  | 154 
 --
  4 files changed, 168 insertions(+), 10 deletions(-)

 diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
 index 0f757fb..f01ff10 100644
 --- a/include/migration/qemu-file.h
 +++ b/include/migration/qemu-file.h
 @@ -92,6 +92,7 @@ typedef struct QEMUFileOps {
  QEMURamHookFunc *after_ram_iterate;
  QEMURamHookFunc *hook_ram_load;
  QEMURamSaveFunc *save_page;
 +QEMUFileGetBufferFunc *get_prefetch_buffer;
  } QEMUFileOps;
  
  QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
 diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
 index b1aa059..44f23d0 100644
 --- a/include/sysemu/sysemu.h
 +++ b/include/sysemu/sysemu.h
 @@ -81,6 +81,7 @@ void qemu_savevm_state_complete(QEMUFile *f);
  void qemu_savevm_state_cancel(void);
  uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
  int qemu_loadvm_state(QEMUFile *f);
 +int qemu_loadvm_state_ft(QEMUFile *f);
  
  /* SLIRP */
  void do_info_slirp(Monitor *mon);
 diff --git a/migration.c b/migration.c
 index d8a9b2d..9be22a4 100644
 --- a/migration.c
 +++ b/migration.c
 @@ -19,6 +19,7 @@
  #include monitor/monitor.h
  #include migration/qemu-file.h
  #include sysemu/sysemu.h
 +#include sysemu/cpus.h
  #include block/block.h
  #include qemu/sockets.h
  #include migration/block.h
 @@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque)
  {
  QEMUFile *f = opaque;
  int ret;
 +int count = 0;
  
 -ret = qemu_loadvm_state(f);
 -qemu_fclose(f);
 -if (ret  0) {
 -fprintf(stderr, load of migration failed\n);
 -exit(EXIT_FAILURE);
 +if (ft_enabled()) {
 +while (qemu_loadvm_state_ft(f) = 0) {
 +count++;
 +DPRINTF(incoming count %d\r, count);
 +}
 +qemu_fclose(f);
 +fprintf(stderr, ft connection lost, launching self..\n);

Obviously,  here we are needing something more that an fprintf,,  right?

We are not checking either if it is one error.

 +} else {
 +ret = qemu_loadvm_state(f);
 +qemu_fclose(f);
 +if (ret  0) {
 +fprintf(stderr, load of migration failed\n);
 +exit(EXIT_FAILURE);
 +}
  }
 +cpu_synchronize_all_post_init();
  qemu_announce_self();
  DPRINTF(successfully loaded vm state\n);
  
 diff --git a/savevm.c b/savevm.c
 index 6daf690..d5bf153 100644
 --- a/savevm.c
 +++ b/savevm.c
 @@ -52,6 +52,8 @@
  #define ARP_PTYPE_IP 0x0800
  #define ARP_OP_REQUEST_REV 0x3
  
 +#define PFB_SIZE 0x01
 +
  static int announce_self_create(uint8_t *buf,
   uint8_t *mac_addr)
  {
 @@ -135,6 +137,10 @@ struct QEMUFile {
  unsigned int iovcnt;
  
  int last_error;
 +
 +uint8_t *pfb;   /* pfb - PerFetch Buffer */

s/PreFetch/Prefetcth/

prefetch_buffer as name?  not used in so many places,  makes things
clearer or more convoluted?  Other comments?

 +static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
 +  int64_t pos, int size)
 +{
 +QEMUFile *f = opaque;
 +
 +if (f-pfb_size - pos = 0) {
 +return 0;
 +}
 +
 +if (f-pfb_size - pos  size) {
 +size = f-pfb_size - pos;
 +}
 +
 +memcpy(buf, f-pfb+pos, size);
 +
 +return size;
 +}
 +
 +
  static int socket_close(void *opaque)
  {
  QEMUFileSocket *s = opaque;
 @@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
  static const QEMUFileOps socket_read_ops = {
  .get_fd = socket_get_fd,
  .get_buffer = socket_get_buffer,
 +.get_prefetch_buffer = socket_get_prefetch_buffer,
  .close =  socket_close
  };
  

  if (f-last_error) {
  ret = f-last_error;
  }
 +
 +if (f-pfb) {
 +g_free(f-pfb);

g_free(f-pfb);
It already checks for NULL.

 +}
 +
  g_free(f);
  return ret;
  }
 @@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v)
  
  static void qemu_file_skip(QEMUFile *f, int size)
  {
 +if (f-pfb_index + size = f-pfb_size) {
 +f-pfb_index += size;
 +return;
 +} else {
 +size -= f-pfb_size - f-pfb_index;
 +f-pfb_index = f-pfb_size;
 +}
 +
  if (f-buf_index + size = f-buf_size) {
  f-buf_index += size;
  }
 @@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf, 
 int size, size_t offset)
  {
  

[Qemu-devel] [PATCH RFC 4/4] Curling: the receiver

2013-09-10 Thread Jules Wang
The receiver does migration loop until the migration connection is
lost. Then, it is started as a backup.

The receiver does not load vm state once a migration begins,
instead, it perfetches one whole migration data into a buffer,
then loads vm state from that buffer afterwards.

Signed-off-by: Jules Wang junqing.w...@cs2c.com.cn
---
 include/migration/qemu-file.h |   1 +
 include/sysemu/sysemu.h   |   1 +
 migration.c   |  22 --
 savevm.c  | 154 --
 4 files changed, 168 insertions(+), 10 deletions(-)

diff --git a/include/migration/qemu-file.h b/include/migration/qemu-file.h
index 0f757fb..f01ff10 100644
--- a/include/migration/qemu-file.h
+++ b/include/migration/qemu-file.h
@@ -92,6 +92,7 @@ typedef struct QEMUFileOps {
 QEMURamHookFunc *after_ram_iterate;
 QEMURamHookFunc *hook_ram_load;
 QEMURamSaveFunc *save_page;
+QEMUFileGetBufferFunc *get_prefetch_buffer;
 } QEMUFileOps;
 
 QEMUFile *qemu_fopen_ops(void *opaque, const QEMUFileOps *ops);
diff --git a/include/sysemu/sysemu.h b/include/sysemu/sysemu.h
index b1aa059..44f23d0 100644
--- a/include/sysemu/sysemu.h
+++ b/include/sysemu/sysemu.h
@@ -81,6 +81,7 @@ void qemu_savevm_state_complete(QEMUFile *f);
 void qemu_savevm_state_cancel(void);
 uint64_t qemu_savevm_state_pending(QEMUFile *f, uint64_t max_size);
 int qemu_loadvm_state(QEMUFile *f);
+int qemu_loadvm_state_ft(QEMUFile *f);
 
 /* SLIRP */
 void do_info_slirp(Monitor *mon);
diff --git a/migration.c b/migration.c
index d8a9b2d..9be22a4 100644
--- a/migration.c
+++ b/migration.c
@@ -19,6 +19,7 @@
 #include monitor/monitor.h
 #include migration/qemu-file.h
 #include sysemu/sysemu.h
+#include sysemu/cpus.h
 #include block/block.h
 #include qemu/sockets.h
 #include migration/block.h
@@ -112,13 +113,24 @@ static void process_incoming_migration_co(void *opaque)
 {
 QEMUFile *f = opaque;
 int ret;
+int count = 0;
 
-ret = qemu_loadvm_state(f);
-qemu_fclose(f);
-if (ret  0) {
-fprintf(stderr, load of migration failed\n);
-exit(EXIT_FAILURE);
+if (ft_enabled()) {
+while (qemu_loadvm_state_ft(f) = 0) {
+count++;
+DPRINTF(incoming count %d\r, count);
+}
+qemu_fclose(f);
+fprintf(stderr, ft connection lost, launching self..\n);
+} else {
+ret = qemu_loadvm_state(f);
+qemu_fclose(f);
+if (ret  0) {
+fprintf(stderr, load of migration failed\n);
+exit(EXIT_FAILURE);
+}
 }
+cpu_synchronize_all_post_init();
 qemu_announce_self();
 DPRINTF(successfully loaded vm state\n);
 
diff --git a/savevm.c b/savevm.c
index 6daf690..d5bf153 100644
--- a/savevm.c
+++ b/savevm.c
@@ -52,6 +52,8 @@
 #define ARP_PTYPE_IP 0x0800
 #define ARP_OP_REQUEST_REV 0x3
 
+#define PFB_SIZE 0x01
+
 static int announce_self_create(uint8_t *buf,
uint8_t *mac_addr)
 {
@@ -135,6 +137,10 @@ struct QEMUFile {
 unsigned int iovcnt;
 
 int last_error;
+
+uint8_t *pfb;   /* pfb - PerFetch Buffer */
+uint64_t pfb_index;
+uint64_t pfb_size;
 };
 
 typedef struct QEMUFileStdio
@@ -193,6 +199,25 @@ static int socket_get_buffer(void *opaque, uint8_t *buf, 
int64_t pos, int size)
 return len;
 }
 
+static int socket_get_prefetch_buffer(void *opaque, uint8_t *buf,
+  int64_t pos, int size)
+{
+QEMUFile *f = opaque;
+
+if (f-pfb_size - pos = 0) {
+return 0;
+}
+
+if (f-pfb_size - pos  size) {
+size = f-pfb_size - pos;
+}
+
+memcpy(buf, f-pfb+pos, size);
+
+return size;
+}
+
+
 static int socket_close(void *opaque)
 {
 QEMUFileSocket *s = opaque;
@@ -440,6 +465,7 @@ QEMUFile *qemu_fdopen(int fd, const char *mode)
 static const QEMUFileOps socket_read_ops = {
 .get_fd = socket_get_fd,
 .get_buffer = socket_get_buffer,
+.get_prefetch_buffer = socket_get_prefetch_buffer,
 .close =  socket_close
 };
 
@@ -493,7 +519,7 @@ QEMUFile *qemu_fopen(const char *filename, const char *mode)
 s-stdio_file = fopen(filename, mode);
 if (!s-stdio_file)
 goto fail;
-
+
 if(mode[0] == 'w') {
 s-file = qemu_fopen_ops(s, stdio_file_write_ops);
 } else {
@@ -739,6 +765,11 @@ int qemu_fclose(QEMUFile *f)
 if (f-last_error) {
 ret = f-last_error;
 }
+
+if (f-pfb) {
+g_free(f-pfb);
+}
+
 g_free(f);
 return ret;
 }
@@ -822,6 +853,14 @@ void qemu_put_byte(QEMUFile *f, int v)
 
 static void qemu_file_skip(QEMUFile *f, int size)
 {
+if (f-pfb_index + size = f-pfb_size) {
+f-pfb_index += size;
+return;
+} else {
+size -= f-pfb_size - f-pfb_index;
+f-pfb_index = f-pfb_size;
+}
+
 if (f-buf_index + size = f-buf_size) {
 f-buf_index += size;
 }
@@ -831,6 +870,21 @@ static int qemu_peek_buffer(QEMUFile *f, uint8_t *buf,