This is still a work in progress, but get everything sent as expected and it is faster than the code that is already there.
Signed-off-by: Juan Quintela <quint...@redhat.com> --- migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 104 insertions(+), 2 deletions(-) diff --git a/migration/ram.c b/migration/ram.c index fdb5bf07a5..efbb253c1a 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = { .recv_pages = none_recv_pages }; +/* Multifd zlib compression */ + +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used) +{ + struct iovec *iov = p->pages->iov; + z_stream *zs = &p->zs; + uint32_t out_size = 0; + int ret; + int i; + + for (i = 0; i < used; i++) { + uint32_t available = p->zbuff_len - out_size; + int flush = Z_NO_FLUSH; + + if (i == used - 1) { + flush = Z_SYNC_FLUSH; + } + + zs->avail_in = iov[i].iov_len; + zs->next_in = iov[i].iov_base; + + zs->avail_out = available; + zs->next_out = p->zbuff + out_size; + + ret = deflate(zs, flush); + if (ret != Z_OK) { + printf("problem with deflate? %d\n", ret); + qemu_mutex_unlock(&p->mutex); + return -1; + } + out_size += available - zs->avail_out; + } + p->next_packet_size = out_size; + + return 0; +} + +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr) +{ + return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size, + perr); +} + +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr) +{ + uint32_t in_size = p->next_packet_size; + uint32_t out_size = 0; + uint32_t expected_size = used * qemu_target_page_size(); + z_stream *zs = &p->zs; + int ret; + int i; + + ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr); + + if (ret != 0) { + return ret; + } + + zs->avail_in = in_size; + zs->next_in = p->zbuff; + + for (i = 0; i < used; i++) { + struct iovec *iov = &p->pages->iov[i]; + int flush = Z_NO_FLUSH; + + if (i == used - 1) { + flush = Z_SYNC_FLUSH; + } + + zs->avail_out = iov->iov_len; + zs->next_out = iov->iov_base; + + ret = inflate(zs, flush); + if (ret != Z_OK) { + printf("%d: problem with inflate? %d\n", p->id, ret); + qemu_mutex_unlock(&p->mutex); + return ret; + } + out_size += iov->iov_len; + } + if (out_size != expected_size) { + printf("out size %d expected size %d\n", + out_size, expected_size); + return -1; + } + return 0; +} + +MultifdMethods multifd_zlib_ops = { + .send_prepare = zlib_send_prepare, + .send_write = zlib_send_write, + .recv_pages = zlib_recv_pages +}; + static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp) { MultiFDInit_t msg; @@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque) /* initial packet */ p->num_packets = 1; - multifd_send_state->ops = &multifd_none_ops; + if (migrate_use_multifd_zlib()) { + multifd_send_state->ops = &multifd_zlib_ops; + } else { + multifd_send_state->ops = &multifd_none_ops; + } while (true) { qemu_sem_wait(&p->sem); @@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque) trace_multifd_recv_thread_start(p->id); rcu_register_thread(); - multifd_recv_state->ops = &multifd_none_ops; + if (migrate_use_multifd_zlib()) { + multifd_recv_state->ops = &multifd_zlib_ops; + } else { + multifd_recv_state->ops = &multifd_none_ops; + } while (true) { uint32_t used; uint32_t flags; -- 2.21.0