Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-08-18 Thread Dr. David Alan Gilbert
* Juan Quintela (quint...@redhat.com) wrote:
 Dr. David Alan Gilbert (git) dgilb...@redhat.com wrote:
  From: Dr. David Alan Gilbert dgilb...@redhat.com
 
  Open a return path, and handle messages that are received upon it.
 
  Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
  +/*
  + * Handles messages sent on the return path towards the source VM
  + *
  + */
  +static void *source_return_path_thread(void *opaque)
  +{
  +MigrationState *ms = opaque;
  +QEMUFile *rp = ms-rp_state.file;
  +uint16_t expected_len, header_len, header_type;
  +const int max_len = 512;
  +uint8_t buf[max_len];
  +uint32_t tmp32;
  +int res;
  +
  +trace_source_return_path_thread_entry();
  +while (rp  !qemu_file_get_error(rp) 
 
 What can make rp == NULL?
 THinking about that, could you mean *rp here?

I've reworked this;  it was meant to catch the case of the rp being
closed early, but was racy.

I've now got:
while (!ms-rp_state.error  !qemu_file_get_error(rp) 
   migration_already_active(ms)) {

and now the rp qemu_file gets closed at the end of the thread,
anyone else that wants the rp_thread to exit sets the error flag.

  +migration_already_active(ms)) {
  +trace_source_return_path_thread_loop_top();
  +header_type = qemu_get_be16(rp);
  +header_len = qemu_get_be16(rp);
  +
  +switch (header_type) {
  +case MIG_RP_MSG_SHUT:
  +case MIG_RP_MSG_PONG:
  +expected_len = 4;
  +break;
  +
  +default:
  +error_report(RP: Received invalid message 0x%04x length 
  0x%04x,
  +header_type, header_len);
  +source_return_path_bad(ms);
  +goto out;
  +}
   
  +if (header_len  expected_len) {
  +error_report(RP: Received message 0x%04x with
  +incorrect length %d expecting %d,
  +header_type, header_len,
  +expected_len);
 
 I know this is a big request, but getting an array with messages length
 and message names to be able to print nice error messages looks ilke good?

Done; (same way as for the commands on the forward path).

  +source_return_path_bad(ms);
  +goto out;
  +}
  +
  +/* We know we've got a valid header by this point */
  +res = qemu_get_buffer(rp, buf, header_len);
  +if (res != header_len) {
  +trace_source_return_path_thread_failed_read_cmd_data();
  +source_return_path_bad(ms);
  +goto out;
  +}
  +
  +/* OK, we have the message and the data */
  +switch (header_type) {
  +case MIG_RP_MSG_SHUT:
  +tmp32 = be32_to_cpup((uint32_t *)buf);
 
 make local variable and call it sibling_error or whatever you like?

Done.

  +trace_source_return_path_thread_shut(tmp32);
  +if (tmp32) {
  +error_report(RP: Sibling indicated error %d, tmp32);
  +source_return_path_bad(ms);
  +}
  +/*
  + * We'll let the main thread deal with closing the RP
  + * we could do a shutdown(2) on it, but we're the only user
  + * anyway, so there's nothing gained.
  + */
  +goto out;
  +
  +case MIG_RP_MSG_PONG:
  +tmp32 = be32_to_cpup((uint32_t *)buf);
 
 unused?
 Althought I guess it is used somewhere to make sure that the value is
 the same that whatever we did the ping.  credentials?
 
 I can't see with this and previous patch what value is sent here.

I'm not using the ping/pong messages for anything active, they exist
primarily as a debug and tracing aid.

  +trace_source_return_path_thread_pong(tmp32);
  +break;
  +
  +default:
  +break;
  +}
  +}
  +if (rp  qemu_file_get_error(rp)) {
  +trace_source_return_path_thread_bad_end();
  +source_return_path_bad(ms);
  +}
  +
  +trace_source_return_path_thread_end();
  +out:
  +return NULL;
  +}
  +
  +__attribute__ (( unused )) /* Until later in patch series */
 
 unused_by_know attribute required O:-)

Dave
--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK



Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-08-18 Thread Dr. David Alan Gilbert
* zhanghailiang (zhang.zhanghaili...@huawei.com) wrote:
 Hi Dave,
 
 On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote:
 From: Dr. David Alan Gilbert dgilb...@redhat.com
 
 Open a return path, and handle messages that are received upon it.
 
 Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
 ---
   include/migration/migration.h |   8 ++
   migration/migration.c | 177 
  +-
   trace-events  |  12 +++
   3 files changed, 196 insertions(+), 1 deletion(-)
 
 diff --git a/include/migration/migration.h b/include/migration/migration.h
 index 36caab9..868f59a 100644
 --- a/include/migration/migration.h
 +++ b/include/migration/migration.h
 @@ -77,6 +77,14 @@ struct MigrationState
 
   int state;
   MigrationParams params;
 +
 +/* State related to return path */
 +struct {
 +QEMUFile *file;
 
 There is already a 'file' member in MigrationState,
 and since for migration, there is only one path direction, just from source 
 side
 to destination side, so it is ok to use that name.
 
 But for post-copy and COLO, we need two-way communication,
 So we can rename the original 'file' member of MigrationState to 'ouput_file',
 and add a new 'input_file' member. For MigrationIncomingState struct, rename 
 its original
 'file' member to 'input_file',and add a new 'output_file'.
 IMHO, this will make things more clear.

Would the following be clearer:

  On the source make the existing migration file:
   QEMUFile  *to_dst_file;
  and for the return path
   QEMUFile  *from_dst_dile;

  and then on the destination, the incoming migration stream:
   QEMUFile  *from_src_file;
  and then the return path on the destionation:
   QEMUFile  *to_src_file;

Dave

 Thanks,
 zhanghailiang
 
 
 +QemuThreadrp_thread;
 +bool  error;
 +} rp_state;
 +
   double mbps;
   int64_t total_time;
   int64_t downtime;
 diff --git a/migration/migration.c b/migration/migration.c
 index afb19a1..fb2f491 100644
 --- a/migration/migration.c
 +++ b/migration/migration.c
 @@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
 **errp)
   return params;
   }
 
 +/*
 + * Return true if we're already in the middle of a migration
 + * (i.e. any of the active or setup states)
 + */
 +static bool migration_already_active(MigrationState *ms)
 +{
 +switch (ms-state) {
 +case MIGRATION_STATUS_ACTIVE:
 +case MIGRATION_STATUS_SETUP:
 +return true;
 +
 +default:
 +return false;
 +
 +}
 +}
 +
   static void get_xbzrle_cache_stats(MigrationInfo *info)
   {
   if (migrate_use_xbzrle()) {
 @@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
 old_state, int new_state)
   }
   }
 
 +static void migrate_fd_cleanup_src_rp(MigrationState *ms)
 +{
 +QEMUFile *rp = ms-rp_state.file;
 +
 +/*
 + * When stuff goes wrong (e.g. failing destination) on the rp, it can 
 get
 + * cleaned up from a few threads; make sure not to do it twice in 
 parallel
 + */
 +rp = atomic_cmpxchg(ms-rp_state.file, rp, NULL);
 +if (rp) {
 +trace_migrate_fd_cleanup_src_rp();
 +qemu_fclose(rp);
 +}
 +}
 +
   static void migrate_fd_cleanup(void *opaque)
   {
   MigrationState *s = opaque;
 @@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
   qemu_bh_delete(s-cleanup_bh);
   s-cleanup_bh = NULL;
 
 +migrate_fd_cleanup_src_rp(s);
 +
   if (s-file) {
   trace_migrate_fd_cleanup();
   qemu_mutex_unlock_iothread();
 @@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
   QEMUFile *f = migrate_get_current()-file;
   trace_migrate_fd_cancel();
 
 +if (s-rp_state.file) {
 +/* shutdown the rp socket, so causing the rp thread to shutdown */
 +qemu_file_shutdown(s-rp_state.file);
 +}
 +
   do {
   old_state = s-state;
   if (old_state != MIGRATION_STATUS_SETUP 
 @@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
   return s-xbzrle_cache_size;
   }
 
 -/* migration thread support */
 +/*
 + * Something bad happened to the RP stream, mark an error
 + * The caller shall print something to indicate why
 + */
 +static void source_return_path_bad(MigrationState *s)
 +{
 +s-rp_state.error = true;
 +migrate_fd_cleanup_src_rp(s);
 +}
 +
 +/*
 + * Handles messages sent on the return path towards the source VM
 + *
 + */
 +static void *source_return_path_thread(void *opaque)
 +{
 +MigrationState *ms = opaque;
 +QEMUFile *rp = ms-rp_state.file;
 +uint16_t expected_len, header_len, header_type;
 +const int max_len = 512;
 +uint8_t buf[max_len];
 +uint32_t tmp32;
 +int res;
 +
 +trace_source_return_path_thread_entry();
 +while (rp  !qemu_file_get_error(rp) 
 +migration_already_active(ms)) {
 +trace_source_return_path_thread_loop_top();
 +   

Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-08-18 Thread zhanghailiang

On 2015/8/18 18:45, Dr. David Alan Gilbert wrote:

* zhanghailiang (zhang.zhanghaili...@huawei.com) wrote:

Hi Dave,

On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote:

From: Dr. David Alan Gilbert dgilb...@redhat.com

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
---
  include/migration/migration.h |   8 ++
  migration/migration.c | 177 +-
  trace-events  |  12 +++
  3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 36caab9..868f59a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,14 @@ struct MigrationState

  int state;
  MigrationParams params;
+
+/* State related to return path */
+struct {
+QEMUFile *file;


There is already a 'file' member in MigrationState,
and since for migration, there is only one path direction, just from source side
to destination side, so it is ok to use that name.

But for post-copy and COLO, we need two-way communication,
So we can rename the original 'file' member of MigrationState to 'ouput_file',
and add a new 'input_file' member. For MigrationIncomingState struct, rename 
its original
'file' member to 'input_file',and add a new 'output_file'.
IMHO, this will make things more clear.


Would the following be clearer:



Yes, it is clearer and  more graceful :)


   On the source make the existing migration file:
QEMUFile  *to_dst_file;
   and for the return path
QEMUFile  *from_dst_dile;
^

 from_dst_file


   and then on the destination, the incoming migration stream:
QEMUFile  *from_src_file;
   and then the return path on the destionation:
QEMUFile  *to_src_file;

Dave


Thanks,
zhanghailiang



+QemuThreadrp_thread;
+bool  error;
+} rp_state;
+
  double mbps;
  int64_t total_time;
  int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index afb19a1..fb2f491 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
**errp)
  return params;
  }

+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+switch (ms-state) {
+case MIGRATION_STATUS_ACTIVE:
+case MIGRATION_STATUS_SETUP:
+return true;
+
+default:
+return false;
+
+}
+}
+
  static void get_xbzrle_cache_stats(MigrationInfo *info)
  {
  if (migrate_use_xbzrle()) {
@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
  }
  }

+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+QEMUFile *rp = ms-rp_state.file;
+
+/*
+ * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+ * cleaned up from a few threads; make sure not to do it twice in parallel
+ */
+rp = atomic_cmpxchg(ms-rp_state.file, rp, NULL);
+if (rp) {
+trace_migrate_fd_cleanup_src_rp();
+qemu_fclose(rp);
+}
+}
+
  static void migrate_fd_cleanup(void *opaque)
  {
  MigrationState *s = opaque;
@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
  qemu_bh_delete(s-cleanup_bh);
  s-cleanup_bh = NULL;

+migrate_fd_cleanup_src_rp(s);
+
  if (s-file) {
  trace_migrate_fd_cleanup();
  qemu_mutex_unlock_iothread();
@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
  QEMUFile *f = migrate_get_current()-file;
  trace_migrate_fd_cancel();

+if (s-rp_state.file) {
+/* shutdown the rp socket, so causing the rp thread to shutdown */
+qemu_file_shutdown(s-rp_state.file);
+}
+
  do {
  old_state = s-state;
  if (old_state != MIGRATION_STATUS_SETUP 
@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
  return s-xbzrle_cache_size;
  }

-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+s-rp_state.error = true;
+migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+MigrationState *ms = opaque;
+QEMUFile *rp = ms-rp_state.file;
+uint16_t expected_len, header_len, header_type;
+const int max_len = 512;
+uint8_t buf[max_len];
+uint32_t tmp32;
+int res;
+
+trace_source_return_path_thread_entry();
+while (rp  !qemu_file_get_error(rp) 
+migration_already_active(ms)) {
+

Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-08-05 Thread zhanghailiang

Hi Dave,

On 2015/6/16 18:26, Dr. David Alan Gilbert (git) wrote:

From: Dr. David Alan Gilbert dgilb...@redhat.com

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
---
  include/migration/migration.h |   8 ++
  migration/migration.c | 177 +-
  trace-events  |  12 +++
  3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 36caab9..868f59a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,14 @@ struct MigrationState

  int state;
  MigrationParams params;
+
+/* State related to return path */
+struct {
+QEMUFile *file;


There is already a 'file' member in MigrationState,
and since for migration, there is only one path direction, just from source side
to destination side, so it is ok to use that name.

But for post-copy and COLO, we need two-way communication,
So we can rename the original 'file' member of MigrationState to 'ouput_file',
and add a new 'input_file' member. For MigrationIncomingState struct, rename 
its original
'file' member to 'input_file',and add a new 'output_file'.
IMHO, this will make things more clear.

Thanks,
zhanghailiang



+QemuThreadrp_thread;
+bool  error;
+} rp_state;
+
  double mbps;
  int64_t total_time;
  int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index afb19a1..fb2f491 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
**errp)
  return params;
  }

+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+switch (ms-state) {
+case MIGRATION_STATUS_ACTIVE:
+case MIGRATION_STATUS_SETUP:
+return true;
+
+default:
+return false;
+
+}
+}
+
  static void get_xbzrle_cache_stats(MigrationInfo *info)
  {
  if (migrate_use_xbzrle()) {
@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
  }
  }

+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+QEMUFile *rp = ms-rp_state.file;
+
+/*
+ * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+ * cleaned up from a few threads; make sure not to do it twice in parallel
+ */
+rp = atomic_cmpxchg(ms-rp_state.file, rp, NULL);
+if (rp) {
+trace_migrate_fd_cleanup_src_rp();
+qemu_fclose(rp);
+}
+}
+
  static void migrate_fd_cleanup(void *opaque)
  {
  MigrationState *s = opaque;
@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
  qemu_bh_delete(s-cleanup_bh);
  s-cleanup_bh = NULL;

+migrate_fd_cleanup_src_rp(s);
+
  if (s-file) {
  trace_migrate_fd_cleanup();
  qemu_mutex_unlock_iothread();
@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
  QEMUFile *f = migrate_get_current()-file;
  trace_migrate_fd_cancel();

+if (s-rp_state.file) {
+/* shutdown the rp socket, so causing the rp thread to shutdown */
+qemu_file_shutdown(s-rp_state.file);
+}
+
  do {
  old_state = s-state;
  if (old_state != MIGRATION_STATUS_SETUP 
@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
  return s-xbzrle_cache_size;
  }

-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+s-rp_state.error = true;
+migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+MigrationState *ms = opaque;
+QEMUFile *rp = ms-rp_state.file;
+uint16_t expected_len, header_len, header_type;
+const int max_len = 512;
+uint8_t buf[max_len];
+uint32_t tmp32;
+int res;
+
+trace_source_return_path_thread_entry();
+while (rp  !qemu_file_get_error(rp) 
+migration_already_active(ms)) {
+trace_source_return_path_thread_loop_top();
+header_type = qemu_get_be16(rp);
+header_len = qemu_get_be16(rp);
+
+switch (header_type) {
+case MIG_RP_MSG_SHUT:
+case MIG_RP_MSG_PONG:
+expected_len = 4;
+break;
+
+default:
+error_report(RP: Received invalid message 0x%04x length 0x%04x,
+header_type, header_len);
+source_return_path_bad(ms);
+goto out;
+}

+if (header_len  expected_len) {
+error_report(RP: Received message 0x%04x with
+

Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-07-16 Thread Dr. David Alan Gilbert
* Amit Shah (amit.s...@redhat.com) wrote:
 On (Tue) 16 Jun 2015 [11:26:28], Dr. David Alan Gilbert (git) wrote:
  From: Dr. David Alan Gilbert dgilb...@redhat.com
  
  Open a return path, and handle messages that are received upon it.
  
  Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
 
 
  -/* migration thread support */
  +/*
  + * Something bad happened to the RP stream, mark an error
  + * The caller shall print something to indicate why
  + */
  +static void source_return_path_bad(MigrationState *s)
 
 Can you rename this to something like
 
 mark_source_rp_bad()
 
 ?
 
 Intent is clearer that way.

Done.

 Also, the comment says caller will print something, but the
 invocations below are a mix of printfs and traces.  Not saying the
 caller has to print always, but maybe only comment needs update.

Yes, I've changed the comment, and changed one of the traces into
an error_report.

Thanks,

Dave

 
  +{
  +s-rp_state.error = true;
  +migrate_fd_cleanup_src_rp(s);
  +}
  +
  +/*
  + * Handles messages sent on the return path towards the source VM
  + *
  + */
  +static void *source_return_path_thread(void *opaque)
  +{
  +MigrationState *ms = opaque;
  +QEMUFile *rp = ms-rp_state.file;
  +uint16_t expected_len, header_len, header_type;
  +const int max_len = 512;
  +uint8_t buf[max_len];
  +uint32_t tmp32;
  +int res;
  +
  +trace_source_return_path_thread_entry();
  +while (rp  !qemu_file_get_error(rp) 
  +migration_already_active(ms)) {
  +trace_source_return_path_thread_loop_top();
  +header_type = qemu_get_be16(rp);
  +header_len = qemu_get_be16(rp);
  +
  +switch (header_type) {
  +case MIG_RP_MSG_SHUT:
  +case MIG_RP_MSG_PONG:
  +expected_len = 4;
  +break;
  +
  +default:
  +error_report(RP: Received invalid message 0x%04x length 
  0x%04x,
  +header_type, header_len);
  +source_return_path_bad(ms);
  +goto out;
  +}
   
  +if (header_len  expected_len) {
  +error_report(RP: Received message 0x%04x with
  +incorrect length %d expecting %d,
  +header_type, header_len,
  +expected_len);
  +source_return_path_bad(ms);
  +goto out;
  +}
  +
  +/* We know we've got a valid header by this point */
  +res = qemu_get_buffer(rp, buf, header_len);
  +if (res != header_len) {
  +trace_source_return_path_thread_failed_read_cmd_data();
  +source_return_path_bad(ms);
  +goto out;
  +}
 
   Amit
--
Dr. David Alan Gilbert / dgilb...@redhat.com / Manchester, UK



Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-07-15 Thread Amit Shah
On (Tue) 16 Jun 2015 [11:26:28], Dr. David Alan Gilbert (git) wrote:
 From: Dr. David Alan Gilbert dgilb...@redhat.com
 
 Open a return path, and handle messages that are received upon it.
 
 Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com


 -/* migration thread support */
 +/*
 + * Something bad happened to the RP stream, mark an error
 + * The caller shall print something to indicate why
 + */
 +static void source_return_path_bad(MigrationState *s)

Can you rename this to something like

mark_source_rp_bad()

?

Intent is clearer that way.

Also, the comment says caller will print something, but the
invocations below are a mix of printfs and traces.  Not saying the
caller has to print always, but maybe only comment needs update.

 +{
 +s-rp_state.error = true;
 +migrate_fd_cleanup_src_rp(s);
 +}
 +
 +/*
 + * Handles messages sent on the return path towards the source VM
 + *
 + */
 +static void *source_return_path_thread(void *opaque)
 +{
 +MigrationState *ms = opaque;
 +QEMUFile *rp = ms-rp_state.file;
 +uint16_t expected_len, header_len, header_type;
 +const int max_len = 512;
 +uint8_t buf[max_len];
 +uint32_t tmp32;
 +int res;
 +
 +trace_source_return_path_thread_entry();
 +while (rp  !qemu_file_get_error(rp) 
 +migration_already_active(ms)) {
 +trace_source_return_path_thread_loop_top();
 +header_type = qemu_get_be16(rp);
 +header_len = qemu_get_be16(rp);
 +
 +switch (header_type) {
 +case MIG_RP_MSG_SHUT:
 +case MIG_RP_MSG_PONG:
 +expected_len = 4;
 +break;
 +
 +default:
 +error_report(RP: Received invalid message 0x%04x length 0x%04x,
 +header_type, header_len);
 +source_return_path_bad(ms);
 +goto out;
 +}
  
 +if (header_len  expected_len) {
 +error_report(RP: Received message 0x%04x with
 +incorrect length %d expecting %d,
 +header_type, header_len,
 +expected_len);
 +source_return_path_bad(ms);
 +goto out;
 +}
 +
 +/* We know we've got a valid header by this point */
 +res = qemu_get_buffer(rp, buf, header_len);
 +if (res != header_len) {
 +trace_source_return_path_thread_failed_read_cmd_data();
 +source_return_path_bad(ms);
 +goto out;
 +}

Amit



Re: [Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-07-13 Thread Juan Quintela
Dr. David Alan Gilbert (git) dgilb...@redhat.com wrote:
 From: Dr. David Alan Gilbert dgilb...@redhat.com

 Open a return path, and handle messages that are received upon it.

 Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
 +/*
 + * Handles messages sent on the return path towards the source VM
 + *
 + */
 +static void *source_return_path_thread(void *opaque)
 +{
 +MigrationState *ms = opaque;
 +QEMUFile *rp = ms-rp_state.file;
 +uint16_t expected_len, header_len, header_type;
 +const int max_len = 512;
 +uint8_t buf[max_len];
 +uint32_t tmp32;
 +int res;
 +
 +trace_source_return_path_thread_entry();
 +while (rp  !qemu_file_get_error(rp) 

What can make rp == NULL?
THinking about that, could you mean *rp here?


 +migration_already_active(ms)) {
 +trace_source_return_path_thread_loop_top();
 +header_type = qemu_get_be16(rp);
 +header_len = qemu_get_be16(rp);
 +
 +switch (header_type) {
 +case MIG_RP_MSG_SHUT:
 +case MIG_RP_MSG_PONG:
 +expected_len = 4;
 +break;
 +
 +default:
 +error_report(RP: Received invalid message 0x%04x length 0x%04x,
 +header_type, header_len);
 +source_return_path_bad(ms);
 +goto out;
 +}
  
 +if (header_len  expected_len) {
 +error_report(RP: Received message 0x%04x with
 +incorrect length %d expecting %d,
 +header_type, header_len,
 +expected_len);

I know this is a big request, but getting an array with messages length
and message names to be able to print nice error messages looks ilke good?

 +source_return_path_bad(ms);
 +goto out;
 +}
 +
 +/* We know we've got a valid header by this point */
 +res = qemu_get_buffer(rp, buf, header_len);
 +if (res != header_len) {
 +trace_source_return_path_thread_failed_read_cmd_data();
 +source_return_path_bad(ms);
 +goto out;
 +}
 +
 +/* OK, we have the message and the data */
 +switch (header_type) {
 +case MIG_RP_MSG_SHUT:
 +tmp32 = be32_to_cpup((uint32_t *)buf);

make local variable and call it sibling_error or whatever you like?

 +trace_source_return_path_thread_shut(tmp32);
 +if (tmp32) {
 +error_report(RP: Sibling indicated error %d, tmp32);
 +source_return_path_bad(ms);
 +}
 +/*
 + * We'll let the main thread deal with closing the RP
 + * we could do a shutdown(2) on it, but we're the only user
 + * anyway, so there's nothing gained.
 + */
 +goto out;
 +
 +case MIG_RP_MSG_PONG:
 +tmp32 = be32_to_cpup((uint32_t *)buf);

unused?
Althought I guess it is used somewhere to make sure that the value is
the same that whatever we did the ping.  credentials?

I can't see with this and previous patch what value is sent here.


 +trace_source_return_path_thread_pong(tmp32);
 +break;
 +
 +default:
 +break;
 +}
 +}
 +if (rp  qemu_file_get_error(rp)) {
 +trace_source_return_path_thread_bad_end();
 +source_return_path_bad(ms);
 +}
 +
 +trace_source_return_path_thread_end();
 +out:
 +return NULL;
 +}
 +
 +__attribute__ (( unused )) /* Until later in patch series */

unused_by_know attribute required O:-)




[Qemu-devel] [PATCH v7 15/42] Return path: Source handling of return path

2015-06-16 Thread Dr. David Alan Gilbert (git)
From: Dr. David Alan Gilbert dgilb...@redhat.com

Open a return path, and handle messages that are received upon it.

Signed-off-by: Dr. David Alan Gilbert dgilb...@redhat.com
---
 include/migration/migration.h |   8 ++
 migration/migration.c | 177 +-
 trace-events  |  12 +++
 3 files changed, 196 insertions(+), 1 deletion(-)

diff --git a/include/migration/migration.h b/include/migration/migration.h
index 36caab9..868f59a 100644
--- a/include/migration/migration.h
+++ b/include/migration/migration.h
@@ -77,6 +77,14 @@ struct MigrationState
 
 int state;
 MigrationParams params;
+
+/* State related to return path */
+struct {
+QEMUFile *file;
+QemuThreadrp_thread;
+bool  error;
+} rp_state;
+
 double mbps;
 int64_t total_time;
 int64_t downtime;
diff --git a/migration/migration.c b/migration/migration.c
index afb19a1..fb2f491 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -278,6 +278,23 @@ MigrationParameters *qmp_query_migrate_parameters(Error 
**errp)
 return params;
 }
 
+/*
+ * Return true if we're already in the middle of a migration
+ * (i.e. any of the active or setup states)
+ */
+static bool migration_already_active(MigrationState *ms)
+{
+switch (ms-state) {
+case MIGRATION_STATUS_ACTIVE:
+case MIGRATION_STATUS_SETUP:
+return true;
+
+default:
+return false;
+
+}
+}
+
 static void get_xbzrle_cache_stats(MigrationInfo *info)
 {
 if (migrate_use_xbzrle()) {
@@ -441,6 +458,21 @@ static void migrate_set_state(MigrationState *s, int 
old_state, int new_state)
 }
 }
 
+static void migrate_fd_cleanup_src_rp(MigrationState *ms)
+{
+QEMUFile *rp = ms-rp_state.file;
+
+/*
+ * When stuff goes wrong (e.g. failing destination) on the rp, it can get
+ * cleaned up from a few threads; make sure not to do it twice in parallel
+ */
+rp = atomic_cmpxchg(ms-rp_state.file, rp, NULL);
+if (rp) {
+trace_migrate_fd_cleanup_src_rp();
+qemu_fclose(rp);
+}
+}
+
 static void migrate_fd_cleanup(void *opaque)
 {
 MigrationState *s = opaque;
@@ -448,6 +480,8 @@ static void migrate_fd_cleanup(void *opaque)
 qemu_bh_delete(s-cleanup_bh);
 s-cleanup_bh = NULL;
 
+migrate_fd_cleanup_src_rp(s);
+
 if (s-file) {
 trace_migrate_fd_cleanup();
 qemu_mutex_unlock_iothread();
@@ -487,6 +521,11 @@ static void migrate_fd_cancel(MigrationState *s)
 QEMUFile *f = migrate_get_current()-file;
 trace_migrate_fd_cancel();
 
+if (s-rp_state.file) {
+/* shutdown the rp socket, so causing the rp thread to shutdown */
+qemu_file_shutdown(s-rp_state.file);
+}
+
 do {
 old_state = s-state;
 if (old_state != MIGRATION_STATUS_SETUP 
@@ -801,8 +840,144 @@ int64_t migrate_xbzrle_cache_size(void)
 return s-xbzrle_cache_size;
 }
 
-/* migration thread support */
+/*
+ * Something bad happened to the RP stream, mark an error
+ * The caller shall print something to indicate why
+ */
+static void source_return_path_bad(MigrationState *s)
+{
+s-rp_state.error = true;
+migrate_fd_cleanup_src_rp(s);
+}
+
+/*
+ * Handles messages sent on the return path towards the source VM
+ *
+ */
+static void *source_return_path_thread(void *opaque)
+{
+MigrationState *ms = opaque;
+QEMUFile *rp = ms-rp_state.file;
+uint16_t expected_len, header_len, header_type;
+const int max_len = 512;
+uint8_t buf[max_len];
+uint32_t tmp32;
+int res;
+
+trace_source_return_path_thread_entry();
+while (rp  !qemu_file_get_error(rp) 
+migration_already_active(ms)) {
+trace_source_return_path_thread_loop_top();
+header_type = qemu_get_be16(rp);
+header_len = qemu_get_be16(rp);
+
+switch (header_type) {
+case MIG_RP_MSG_SHUT:
+case MIG_RP_MSG_PONG:
+expected_len = 4;
+break;
+
+default:
+error_report(RP: Received invalid message 0x%04x length 0x%04x,
+header_type, header_len);
+source_return_path_bad(ms);
+goto out;
+}
 
+if (header_len  expected_len) {
+error_report(RP: Received message 0x%04x with
+incorrect length %d expecting %d,
+header_type, header_len,
+expected_len);
+source_return_path_bad(ms);
+goto out;
+}
+
+/* We know we've got a valid header by this point */
+res = qemu_get_buffer(rp, buf, header_len);
+if (res != header_len) {
+trace_source_return_path_thread_failed_read_cmd_data();
+source_return_path_bad(ms);
+goto out;
+}
+
+/* OK, we have the message and the data */
+switch (header_type) {
+case MIG_RP_MSG_SHUT:
+tmp32 =