"Liu, Yuan1" <yuan1....@intel.com> writes:

>> -----Original Message-----
>> From: Fabiano Rosas <faro...@suse.de>
>> Sent: Monday, May 13, 2024 11:14 PM
>> To: Liu, Yuan1 <yuan1....@intel.com>; pet...@redhat.com
>> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1....@intel.com>; Zou, Nanhai
>> <nanhai....@intel.com>
>> Subject: Re: [PATCH v6 6/7] migration/multifd: implement qpl compression
>> and decompression
>> 
>> Yuan Liu <yuan1....@intel.com> writes:
>> 
>> > each qpl job is used to (de)compress a normal page and it can
>> > be processed independently by the IAA hardware. All qpl jobs
>> > are submitted to the hardware at once, and wait for all jobs
>> > completion. If hardware path(IAA) is not available, use software
>> > for compression and decompression.
>> >
>> > Signed-off-by: Yuan Liu <yuan1....@intel.com>
>> > Reviewed-by: Nanhai Zou <nanhai....@intel.com>
>> > ---
>> >  migration/multifd-qpl.c | 284 +++++++++++++++++++++++++++++++++++++++-
>> >  1 file changed, 280 insertions(+), 4 deletions(-)
>> >
>> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
>> > index 89fa51091a..9a1fddbdd0 100644
>> > --- a/migration/multifd-qpl.c
>> > +++ b/migration/multifd-qpl.c
>> > @@ -13,6 +13,7 @@
>> >  #include "qemu/osdep.h"
>> >  #include "qemu/module.h"
>> >  #include "qapi/error.h"
>> > +#include "exec/ramblock.h"
>> >  #include "migration.h"
>> >  #include "multifd.h"
>> >  #include "qpl/qpl.h"
>> > @@ -204,6 +205,139 @@ static void
>> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >      p->iov = NULL;
>> >  }
>> >
>> > +/**
>> > + * multifd_qpl_prepare_job: prepare a compression or decompression job
>> > + *
>> > + * Prepare a compression or decompression job and configure job
>> attributes
>> > + * including job compression level and flags.
>> > + *
>> > + * @job: pointer to the QplData structure
>> 
>> qpl_job structure
>
> Thanks for the comment, I will fix this next version.
>
>> > + * @is_compression: compression or decompression indication
>> > + * @input: pointer to the input data buffer
>> > + * @input_len: the length of the input data
>> > + * @output: pointer to the output data buffer
>> > + * @output_len: the size of the output data buffer
>> > + */
>> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
>> > +                                    uint8_t *input, uint32_t input_len,
>> > +                                    uint8_t *output, uint32_t
>> output_len)
>> > +{
>> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
>> > +    job->next_in_ptr = input;
>> > +    job->next_out_ptr = output;
>> > +    job->available_in = input_len;
>> > +    job->available_out = output_len;
>> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
>> > +    /* only supports one compression level */
>> > +    job->level = 1;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_build_packet: build a qpl compressed data packet
>> > + *
>> > + * The qpl compressed data packet consists of two parts, one part
>> stores
>> > + * the compressed length of each page, and the other part is the
>> compressed
>> > + * data of each page. The zbuf_hdr stores the compressed length of all
>> pages,
>> > + * and use a separate IOV to store the compressed data of each page.
>> > + *
>> > + * @qpl: pointer to the QplData structure
>> > + * @p: Params for the channel that we are using
>> > + * @idx: The index of the compressed length array
>> > + * @addr: pointer to the compressed data
>> > + * @len: The length of the compressed data
>> > + */
>> > +static void multifd_qpl_build_packet(QplData *qpl, MultiFDSendParams
>> *p,
>> > +                                     uint32_t idx, uint8_t *addr,
>> uint32_t len)
>> > +{
>> > +    qpl->zbuf_hdr[idx] = cpu_to_be32(len);
>> > +    p->iov[p->iovs_num].iov_base = addr;
>> > +    p->iov[p->iovs_num].iov_len = len;
>> > +    p->iovs_num++;
>> > +    p->next_packet_size += len;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_compress_pages: compress normal pages
>> > + *
>> > + * Each normal page will be compressed independently, and the
>> compression jobs
>> > + * will be submitted to the IAA hardware in non-blocking mode, waiting
>> for all
>> > + * jobs to be completed and filling the compressed length and data into
>> the
>> > + * sending IOVs. If IAA device is not available, the software path is
>> used.
>> > + *
>> > + * Returns 0 for success or -1 for error
>> > + *
>> > + * @p: Params for the channel that we are using
>> > + * @errp: pointer to an error
>> > + */
>> > +static int multifd_qpl_compress_pages(MultiFDSendParams *p, Error
>> **errp)
>> > +{
>> > +    qpl_status status;
>> > +    QplData *qpl = p->compress_data;
>> > +    MultiFDPages_t *pages = p->pages;
>> > +    uint8_t *zbuf = qpl->zbuf;
>> > +    uint8_t *host = pages->block->host;
>> > +    uint32_t job_num = pages->normal_num;
>> 
>> A bit misleading because job_num is used in the previous patch as a
>> synonym for page_count. We could change the previous patch to:
>> multifd_qpl_init(uint32_t page_count, ...
>
> Yes, I will replace job_num with page_count for multifd_qpl_init next version.
>
>> > +    qpl_job *job = NULL;
>> > +
>> > +    assert(job_num <= qpl->total_job_num);
>> > +    /* submit all compression jobs */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        job = qpl->job_array[i];
>> > +        multifd_qpl_prepare_job(job, true, host + pages->offset[i],
>> > +                                p->page_size, zbuf, p->page_size - 1);
>> 
>> Isn't the output buffer size == page size, why the -1?
>
> I think the compressed data should be smaller than a normal page. 
> If the compressed data size is equal to a normal page, send the original 
> page directly to avoid decompression operation.
>
> If the output buffer size is set to p->page_size, the compressed data size 
> may be greater than or equal to a normal page, then there are two conditions 
> for
> sending a normal page, one is job status == QPL_STS_OK and job->total_out == 
> p->page_size,
> another is job status == QPL_STS_MORE_OUTPUT_NEEDED.
>
> If the output buffer size is p->page_size - 1, only check 
> QPL_STS_MORE_OUTPUT_NEEDED is ok. 
> I will add comments for this in the next version.
>

Thanks.

>> > +        /* if hardware path(IAA) is unavailable, call the software path
>> */
>> 
>> If we're doing the fallback automatically, isn't that what qpl_path_auto
>> does already? What's the difference between the two approaches?
>
> The qpl_path_auto feature currently has some limitations.
>
> Limitation 1: it does not detect IAA device status before job submission, 
> which means
> I need to use qpl_init_job(qpl_path_hardware, ...) first to make sure the 
> hardware path
> is available, then qpl_path_auto can work for software fallback when 
> submitting a job to 
> the IAA fails.
>
> Limitation 2: The job submission API has changed
> For the qpl_path_hardware, the qpl_submit_job is a non-blocking function, 
> only submitting
> a job to IAA work queues, use qpl_wait_job to get the job result.
>
> For qpl_path_software/auto, the qpl_submit_job is a blocking function and 
> returns the job
> result directly,  qpl_wait_job can't get the job result.
>
> In short, the qpl_submit_job/qpl_wait_job APIs do not well support the 
> qpl_path_auto feature.

Please put a summary of this in the commit message so in the future we
can evaluate whether to continue checking for ourselves.

>
>> > +        if (!qpl->iaa_avail) {
>> 
>> This function got a bit convoluted, it's probably worth a check at the
>> start and a branch to different multifd_qpl_compress_pages_slow()
>> routine altogether.
>
> I agree that using multifd_qpl_compress_pages_slow is a better choice.
>
> In my previous thoughts, the QPl software path(or slow path) is only used
> for the unit test and I did not consider fallback to the software path when
> migration start because the QPL software path has no advantage over zstd.
> So when the work queue is full, always try to resubmit the job instead of 
> fallback software path.
>
> I now realize that when IAA hardware throughput is the bottleneck(the work 
> queue is full),
> switching to software may also increase performance.
>
> I will implement the automatically falling back to the software when IAA work
> queue is full into the next version and update the performance data. With the
> increase in mutlfd migration threads, the performance will be better than now.
> Currently, 4 threads will reach the IAA device throughput bottleneck, then 
> increasing
> The number of threads does not increase any performance.
>
>> > +            status = qpl_execute_job(job);
>> > +            if (status == QPL_STS_OK) {
>> > +                multifd_qpl_build_packet(qpl, p, i, zbuf, job-
>> >total_out);
>> > +            } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
>> > +                /* compressed length exceeds page size, send page
>> directly */
>> > +                multifd_qpl_build_packet(qpl, p, i, host + pages-
>> >offset[i],
>> > +                                         p->page_size);
>> > +            } else {
>> > +                error_setg(errp, "multifd %u: qpl_execute_job
>> error %d",
>> > +                           p->id, status);
>> > +                return -1;
>> > +            }
>> > +            zbuf += p->page_size;
>> > +            continue;
>> > +        }
>> > +retry:
>> > +        status = qpl_submit_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            zbuf += p->page_size;
>> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
>> > +            goto retry;
>> 
>> A retry count here would be nice.
>
> As the previous comment, I will switch to multifd_qpl_compress_pages_slow
> When the work queue is full, I will give a performance comparison between
> hardware only and software fallback next version.
>
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +    if (!qpl->iaa_avail) {
>> > +        goto done;
>> > +    }
>> > +    /* wait all jobs to complete for hardware(IAA) path */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        job = qpl->job_array[i];
>> > +        status = qpl_wait_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            multifd_qpl_build_packet(qpl, p, i, qpl->zbuf + (p-
>> >page_size * i),
>> > +                                     job->total_out);
>> > +        } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
>> > +            /* compressed data length exceeds page size, send page
>> directly */
>> > +            multifd_qpl_build_packet(qpl, p, i, host + pages-
>> >offset[i],
>> > +                                     p->page_size);
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +done:
>> > +    return 0;
>> > +}
>> > +
>> >  /**
>> >   * multifd_qpl_send_prepare: prepare data to be able to send
>> >   *
>> > @@ -217,8 +351,28 @@ static void
>> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >   */
>> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
>> >  {
>> > -    /* Implement in next patch */
>> > -    return -1;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t hdr_size;
>> > +
>> > +    if (!multifd_send_prepare_common(p)) {
>> > +        goto out;
>> > +    }
>> > +
>> > +    assert(p->pages->normal_num <= qpl->total_job_num);
>> > +    hdr_size = p->pages->normal_num * sizeof(uint32_t);
>> > +    /* prepare the header that stores the lengths of all compressed
>> data */
>> > +    p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
>> > +    p->iov[1].iov_len = hdr_size;
>> 
>> Better use p->iovs_num here in case we ever decide to add more stuff to
>> the front of the array.
>
> Get it, I will fix this next version.
>  
>> > +    p->iovs_num++;
>> > +    p->next_packet_size += hdr_size;
>> 
>> Here's the first time we're setting this value, right? So just a regular
>> attribution(=).
>
> Yes, I will fix this next version.
>
>> > +    if (multifd_qpl_compress_pages(p, errp) != 0) {
>> > +        return -1;
>> > +    }
>> > +
>> > +out:
>> > +    p->flags |= MULTIFD_FLAG_QPL;
>> > +    multifd_send_fill_packet(p);
>> > +    return 0;
>> >  }
>> >
>> >  /**
>> > @@ -256,6 +410,88 @@ static void
>> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>> >      p->compress_data = NULL;
>> >  }
>> >
>> > +/**
>> > + * multifd_qpl_decompress_pages: decompress normal pages
>> > + *
>> > + * Each compressed page will be decompressed independently, and the
>> > + * decompression jobs will be submitted to the IAA hardware in non-
>> blocking
>> > + * mode, waiting for all jobs to be completed and loading the
>> decompressed
>> > + * data into guest memory. If IAA device is not available, the software
>> path
>> > + * is used.
>> > + *
>> > + * Returns 0 for success or -1 for error
>> > + *
>> > + * @p: Params for the channel that we are using
>> > + * @errp: pointer to an error
>> > + */
>> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
>> **errp)
>> > +{
>> > +    qpl_status status;
>> > +    qpl_job *job;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t job_num = p->normal_num;
>> > +    uint32_t off = 0;
>> > +
>> > +    assert(job_num <= qpl->total_job_num);
>> > +    /* submit all decompression jobs */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        /* if the data size is the same as the page size, load it
>> directly */
>> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
>> > +            memcpy(p->host + p->normal[i], qpl->zbuf + off, p-
>> >page_size);
>> > +            off += p->page_size;
>> > +            continue;
>> > +        }
>> > +        job = qpl->job_array[i];
>> > +        multifd_qpl_prepare_job(job, false, qpl->zbuf + off, qpl-
>> >zbuf_hdr[i],
>> > +                                p->host + p->normal[i], p->page_size);
>> > +        /* if hardware path(IAA) is unavailable, call the software path
>> */
>> > +        if (!qpl->iaa_avail) {
>> > +            status = qpl_execute_job(job);
>> > +            if (status == QPL_STS_OK) {
>> > +                off += qpl->zbuf_hdr[i];
>> > +                continue;
>> > +            }
>> > +            error_setg(errp, "multifd %u: qpl_execute_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +retry:
>> > +        status = qpl_submit_job(job);
>> > +        if (status == QPL_STS_OK) {
>> > +            off += qpl->zbuf_hdr[i];
>> > +        } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
>> > +            goto retry;
>> > +        } else {
>> > +            error_setg(errp, "multifd %u: qpl_submit_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +    if (!qpl->iaa_avail) {
>> > +        goto done;
>> > +    }
>> > +    /* wait all jobs to complete for hardware(IAA) path */
>> > +    for (int i = 0; i < job_num; i++) {
>> > +        if (qpl->zbuf_hdr[i] == p->page_size) {
>> > +            continue;
>> > +        }
>> > +        job = qpl->job_array[i];
>> > +        status = qpl_wait_job(job);
>> > +        if (status != QPL_STS_OK) {
>> > +            error_setg(errp, "multifd %u: qpl_wait_job failed with
>> error %d",
>> > +                       p->id, status);
>> > +            return -1;
>> > +        }
>> > +        if (job->total_out != p->page_size) {
>> > +            error_setg(errp, "multifd %u: decompressed len %u, expected
>> len %u",
>> > +                       p->id, job->total_out, p->page_size);
>> > +            return -1;
>> > +        }
>> > +    }
>> > +done:
>> > +    return 0;
>> > +}
>> > +
>> >  /**
>> >   * multifd_qpl_recv: read the data from the channel into actual pages
>> >   *
>> > @@ -269,8 +505,48 @@ static void
>> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>> >   */
>> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
>> >  {
>> > -    /* Implement in next patch */
>> > -    return -1;
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t in_size = p->next_packet_size;
>> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
>> > +    uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
>> > +    uint32_t data_len = 0;
>> > +    int ret;
>> > +
>> > +    if (flags != MULTIFD_FLAG_QPL) {
>> > +        error_setg(errp, "multifd %u: flags received %x flags
>> expected %x",
>> > +                   p->id, flags, MULTIFD_FLAG_QPL);
>> > +        return -1;
>> > +    }
>> > +    multifd_recv_zero_page_process(p);
>> > +    if (!p->normal_num) {
>> > +        assert(in_size == 0);
>> > +        return 0;
>> > +    }
>> > +
>> > +    /* read compressed data lengths */
>> > +    assert(hdr_len < in_size);
>> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len,
>> errp);
>> > +    if (ret != 0) {
>> > +        return ret;
>> > +    }
>> > +    assert(p->normal_num <= qpl->total_job_num);
>> 
>> I'm still in doubt whether we should use p->page_count directly all
>> over. It's nice to move the concept into the QPL domain space, but it
>> makes less sense in these functions that take MultiFD*Params as
>> argument.
>
> I don't understand what you mean here. Do you plan to remove page_count from 
> MultiFD*Params
> and provide a new API to get the migrated page count?
>

No, I mean that qpl->total_job_num == p->page_count, so we could use
p->page_count in the functions that have MultiFDParams available. Maybe
even drop total_job_num altogether. But I'm debating if it is worth it,
because that makes the code more coupled to multifd and we may not want
that. Let's leave it for now.

>> > +    for (int i = 0; i < p->normal_num; i++) {
>> > +        qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
>> > +        data_len += qpl->zbuf_hdr[i];
>> > +        assert(qpl->zbuf_hdr[i] <= p->page_size);
>> > +    }
>> > +
>> > +    /* read compressed data */
>> > +    assert(in_size == hdr_len + data_len);
>> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len,
>> errp);
>> > +    if (ret != 0) {
>> > +        return ret;
>> > +    }
>> > +
>> > +    if (multifd_qpl_decompress_pages(p, errp) != 0) {
>> > +        return -1;
>> > +    }
>> > +    return 0;
>> >  }
>> >
>> >  static MultiFDMethods multifd_qpl_ops = {

Reply via email to