Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work

2018-01-23 Thread Dr. David Alan Gilbert
* Juan Quintela (quint...@redhat.com) wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela 

General comment; given it's reasonably large, it would be nice to split
this into a send and a receive patch.
Some tracing wouldn't do any harm - it's going to be fun debugging
failures where some sockets connect and then fail, so tracing would
help.


> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c   | 114 
> +++---
>  migration/ram.h   |   3 ++
>  migration/socket.c|  39 -
>  migration/socket.h|  10 +
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  QEMUFile *f = qemu_fopen_channel_input(ioc);
>  migration_fd_process_incoming(f);
>  }
> -/* We still only have a single channel.  Nothing to do here yet */
> +multifd_recv_new_channel(ioc);

OK, that looks a little odd at the moment - do we grow the
recv_new_channel and still leave the first one in there?

>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +if (migrate_use_multifd()) {
> +int thread_count = migrate_multifd_channels();
> +
> +return thread_count == multifd_created_channels();
> +}
>  return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>  uint8_t id;
>  char *name;
>  QemuThread thread;
> +QIOChannel *c;
>  QemuSemaphore sem;
>  QemuMutex mutex;
>  bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>  int i;
>  
> +if (errp) {
> +MigrationState *s = migrate_get_current();
> +migrate_set_error(s, errp);
> +if (s->state == MIGRATION_STATUS_SETUP ||
> +s->state == MIGRATION_STATUS_ACTIVE) {
> +migrate_set_state(>state, s->state,
> +  MIGRATION_STATUS_FAILED);
> +}
> +}
>  for (i = 0; i < multifd_send_state->count; i++) {
>  MultiFDSendParams *p = _send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>  qemu_thread_join(>thread);
>  qemu_mutex_destroy(>mutex);
>  qemu_sem_destroy(>sem);
> +socket_send_channel_destroy(p->c);
>  g_free(p->name);
>  p->name = NULL;
>  }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>  return ret;
>  }
>  
> +typedef struct {
> +uint32_t version;
> +unsigned char uuid[16]; /* QemuUUID */
> +uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>  MultiFDSendParams *p = opaque;
> +

Re: [Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work

2018-01-21 Thread Peter Xu
On Wed, Jan 10, 2018 at 01:47:13PM +0100, Juan Quintela wrote:
> We create new channels for each new thread created. We send through
> them in a packed struct.  This way we can check we connect the right
> channels in both sides.
> 
> Signed-off-by: Juan Quintela 
> 
> --
> Split SocketArgs into incoming and outgoing args
> 
> Use UUID's on the initial message, so we are sure we are connecting to
> the right channel.
> 
> Remove init semaphore.  Now that we use uuids on the init message, we
> know that this is our channel.
> 
> Fix recv socket destwroy, we were destroying send channels.
> This was very interesting, because we were using an unreferred object
> without problems.
> 
> Move to struct of pointers
> init channel sooner.
> split recv thread creation.
> listen on main thread
> We count the number of created threads to know when we need to stop listening
> Use g_strdup_printf
> report channel id on errors
> Add name parameter
> Use local_err
> Add Error * parameter to socket_send_channel_create()
> Use qio_channel_*_all
> Use asynchronous connect
> Use an struct to send all fields
> Use default uuid
> Fix local_err = NULL (dave)
> Make lines 80 lines long (checkpatch)
> Move multifd_new_channel() and multifd_recv_thread() to later patches
> when used.
> Add __attribute__(packad)
> Use UUIDs are opaques isntead of the ASCII represantation
> rename migrate_new_channel_async to migrate_new_send_channel_async
> rename recv_channel_destroy to _unref.  And create the pairing _ref.
> ---
>  migration/migration.c |   7 +++-
>  migration/ram.c   | 114 
> +++---
>  migration/ram.h   |   3 ++
>  migration/socket.c|  39 -
>  migration/socket.h|  10 +
>  5 files changed, 137 insertions(+), 36 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index e506b9c2c6..77fc17f723 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>  QEMUFile *f = qemu_fopen_channel_input(ioc);
>  migration_fd_process_incoming(f);
>  }
> -/* We still only have a single channel.  Nothing to do here yet */
> +multifd_recv_new_channel(ioc);
>  }
>  
>  /**
> @@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
>   */
>  bool migration_has_all_channels(void)
>  {
> +if (migrate_use_multifd()) {
> +int thread_count = migrate_multifd_channels();
> +
> +return thread_count == multifd_created_channels();
> +}
>  return true;
>  }
>  
> diff --git a/migration/ram.c b/migration/ram.c
> index 5a109efeda..aef5a323f3 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -36,6 +36,7 @@
>  #include "xbzrle.h"
>  #include "ram.h"
>  #include "migration.h"
> +#include "socket.h"
>  #include "migration/register.h"
>  #include "migration/misc.h"
>  #include "qemu-file.h"
> @@ -49,6 +50,8 @@
>  #include "qemu/rcu_queue.h"
>  #include "migration/colo.h"
>  #include "migration/block.h"
> +#include "sysemu/sysemu.h"
> +#include "qemu/uuid.h"
>  
>  /***/
>  /* ram save/restore */
> @@ -396,6 +399,7 @@ struct MultiFDSendParams {
>  uint8_t id;
>  char *name;
>  QemuThread thread;
> +QIOChannel *c;
>  QemuSemaphore sem;
>  QemuMutex mutex;
>  bool quit;
> @@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
>  {
>  int i;
>  
> +if (errp) {
> +MigrationState *s = migrate_get_current();
> +migrate_set_error(s, errp);
> +if (s->state == MIGRATION_STATUS_SETUP ||
> +s->state == MIGRATION_STATUS_ACTIVE) {
> +migrate_set_state(>state, s->state,
> +  MIGRATION_STATUS_FAILED);

I'm fine with it, but could I ask why we explicitly pass the error
into this function and handle the state machine transition here?
Asked since IMHO we know the error already when calling
terminate_multifd_send_threads() because we passed it in as parameter,
then why not we do that there?

[1]

> +}
> +}
>  for (i = 0; i < multifd_send_state->count; i++) {
>  MultiFDSendParams *p = _send_state->params[i];
>  
> @@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
>  qemu_thread_join(>thread);
>  qemu_mutex_destroy(>mutex);
>  qemu_sem_destroy(>sem);
> +socket_send_channel_destroy(p->c);
>  g_free(p->name);
>  p->name = NULL;
>  }
> @@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
>  return ret;
>  }
>  
> +typedef struct {
> +uint32_t version;
> +unsigned char uuid[16]; /* QemuUUID */
> +uint8_t id;
> +} __attribute__((packed)) MultiFDInit_t;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>  MultiFDSendParams *p = opaque;
> +MultiFDInit_t msg;
> +Error *local_err = NULL;
> +

[Qemu-devel] [PATCH v10 04/14] migration: Start of multiple fd work

2018-01-10 Thread Juan Quintela
We create new channels for each new thread created. We send through
them in a packed struct.  This way we can check we connect the right
channels in both sides.

Signed-off-by: Juan Quintela 

--
Split SocketArgs into incoming and outgoing args

Use UUID's on the initial message, so we are sure we are connecting to
the right channel.

Remove init semaphore.  Now that we use uuids on the init message, we
know that this is our channel.

Fix recv socket destwroy, we were destroying send channels.
This was very interesting, because we were using an unreferred object
without problems.

Move to struct of pointers
init channel sooner.
split recv thread creation.
listen on main thread
We count the number of created threads to know when we need to stop listening
Use g_strdup_printf
report channel id on errors
Add name parameter
Use local_err
Add Error * parameter to socket_send_channel_create()
Use qio_channel_*_all
Use asynchronous connect
Use an struct to send all fields
Use default uuid
Fix local_err = NULL (dave)
Make lines 80 lines long (checkpatch)
Move multifd_new_channel() and multifd_recv_thread() to later patches
when used.
Add __attribute__(packad)
Use UUIDs are opaques isntead of the ASCII represantation
rename migrate_new_channel_async to migrate_new_send_channel_async
rename recv_channel_destroy to _unref.  And create the pairing _ref.
---
 migration/migration.c |   7 +++-
 migration/ram.c   | 114 +++---
 migration/ram.h   |   3 ++
 migration/socket.c|  39 -
 migration/socket.h|  10 +
 5 files changed, 137 insertions(+), 36 deletions(-)

diff --git a/migration/migration.c b/migration/migration.c
index e506b9c2c6..77fc17f723 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -426,7 +426,7 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
 QEMUFile *f = qemu_fopen_channel_input(ioc);
 migration_fd_process_incoming(f);
 }
-/* We still only have a single channel.  Nothing to do here yet */
+multifd_recv_new_channel(ioc);
 }
 
 /**
@@ -437,6 +437,11 @@ void migration_ioc_process_incoming(QIOChannel *ioc)
  */
 bool migration_has_all_channels(void)
 {
+if (migrate_use_multifd()) {
+int thread_count = migrate_multifd_channels();
+
+return thread_count == multifd_created_channels();
+}
 return true;
 }
 
diff --git a/migration/ram.c b/migration/ram.c
index 5a109efeda..aef5a323f3 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -36,6 +36,7 @@
 #include "xbzrle.h"
 #include "ram.h"
 #include "migration.h"
+#include "socket.h"
 #include "migration/register.h"
 #include "migration/misc.h"
 #include "qemu-file.h"
@@ -49,6 +50,8 @@
 #include "qemu/rcu_queue.h"
 #include "migration/colo.h"
 #include "migration/block.h"
+#include "sysemu/sysemu.h"
+#include "qemu/uuid.h"
 
 /***/
 /* ram save/restore */
@@ -396,6 +399,7 @@ struct MultiFDSendParams {
 uint8_t id;
 char *name;
 QemuThread thread;
+QIOChannel *c;
 QemuSemaphore sem;
 QemuMutex mutex;
 bool quit;
@@ -412,6 +416,15 @@ static void terminate_multifd_send_threads(Error *errp)
 {
 int i;
 
+if (errp) {
+MigrationState *s = migrate_get_current();
+migrate_set_error(s, errp);
+if (s->state == MIGRATION_STATUS_SETUP ||
+s->state == MIGRATION_STATUS_ACTIVE) {
+migrate_set_state(>state, s->state,
+  MIGRATION_STATUS_FAILED);
+}
+}
 for (i = 0; i < multifd_send_state->count; i++) {
 MultiFDSendParams *p = _send_state->params[i];
 
@@ -437,6 +450,7 @@ int multifd_save_cleanup(Error **errp)
 qemu_thread_join(>thread);
 qemu_mutex_destroy(>mutex);
 qemu_sem_destroy(>sem);
+socket_send_channel_destroy(p->c);
 g_free(p->name);
 p->name = NULL;
 }
@@ -447,9 +461,27 @@ int multifd_save_cleanup(Error **errp)
 return ret;
 }
 
+typedef struct {
+uint32_t version;
+unsigned char uuid[16]; /* QemuUUID */
+uint8_t id;
+} __attribute__((packed)) MultiFDInit_t;
+
 static void *multifd_send_thread(void *opaque)
 {
 MultiFDSendParams *p = opaque;
+MultiFDInit_t msg;
+Error *local_err = NULL;
+size_t ret;
+
+msg.version = 1;
+msg.id = p->id;
+memcpy(msg.uuid, _uuid.data, sizeof(msg.uuid));
+ret = qio_channel_write_all(p->c, (char *), sizeof(msg), _err);
+if (ret != 0) {
+terminate_multifd_send_threads(local_err);
+return NULL;
+}
 
 while (true) {
 qemu_mutex_lock(>mutex);
@@ -464,6 +496,27 @@ static void *multifd_send_thread(void *opaque)
 return NULL;
 }
 
+static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
+{
+MultiFDSendParams *p = opaque;
+QIOChannel *sioc = QIO_CHANNEL(qio_task_get_source(task));
+Error *local_err = NULL;
+
+if