On 04/20/2017 06:01 AM, Michal Privoznik wrote:
> One big downside of using the pipe to transfer the data is that
> we can really transfer just bare data. No metadata can be carried
> through unless some formatted messages are introduced. That would
> be quite painful to achieve so let's use a message queue. It's
> fairly easy to exchange info between threads now that iohelper is
> no longer used.
>
> The reason why we cannot use the FD for plain files directly is
> that despite us setting noblock flag on the FD, any
> read()/write() blocks regardless (which is a show stopper since
> those parts of the code are run from the event loop) and poll()
> reports such FD as always readable/writable - even though the
> subsequent operation might block.
>
> The pipe is still not gone though. It is used to signal to even
> loop that an event occurred (e.g. data are available for reading
> in the queue, or vice versa).
>
> Signed-off-by: Michal Privoznik <[email protected]>
> ---
> src/util/virfdstream.c | 402
> ++++++++++++++++++++++++++++++++++++++++++-------
> 1 file changed, 350 insertions(+), 52 deletions(-)
>
Very strange - compilation breaks on this patch:
util/virfdstream.c: In function 'virFDStreamThread':
util/virfdstream.c:551:15: error: 'got' may be used uninitialized in
this function [-Werror=maybe-uninitialized]
total += got;
^~
The reason this happens is that virFDStreamThreadDoWrite doesn't
initialize got, so it's return is indeterminate if (msg->type) is not
VIR_FDSTREAM_MSG_TYPE_DATA
Wish the complaint was in the right function...
Before I forget... starting here - perhaps a bit "nervous" about the
claim from your ping that "Patches 01-07 are fairly trivial and are more
of a bug fixes than feature implementation"... I'd almost say 1-4 are
trivial, 5 is a little less so trivial since you're "removing" the
iohelper command and replacing it "inline", but this has moved into
altering more of the algorithm. So close to a release...
I see Daniel has responded to this one too ... still I'll point out a
few more things...
> diff --git a/src/util/virfdstream.c b/src/util/virfdstream.c
> index 7a8d65d..0350494 100644
> --- a/src/util/virfdstream.c
> +++ b/src/util/virfdstream.c
> @@ -49,6 +49,27 @@
>
> VIR_LOG_INIT("fdstream");
>
> +typedef enum {
> + VIR_FDSTREAM_MSG_TYPE_DATA,
> +} virFDStreamMsgType;
> +
> +typedef struct _virFDStreamMsg virFDStreamMsg;
> +typedef virFDStreamMsg *virFDStreamMsgPtr;
> +struct _virFDStreamMsg {
> + virFDStreamMsgPtr next;
> +
> + virFDStreamMsgType type;
> +
> + union {
> + struct {
> + char *buf;
> + size_t len;
> + size_t offset;
> + } data;
> + } stream;
> +};
> +
> +
> /* Tunnelled migration stream support */
> typedef struct virFDStreamData virFDStreamData;
> typedef virFDStreamData *virFDStreamDataPtr;
> @@ -80,18 +101,25 @@ struct virFDStreamData {
>
> /* Thread data */
> virThreadPtr thread;
> + virCond threadCond;
> int threadErr;
> bool threadQuit;
> + bool threadAbort;
> + bool threadDoRead;
> + virFDStreamMsgPtr msg;
> };
>
> static virClassPtr virFDStreamDataClass;
>
> +static void virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue);
> +
> static void
> virFDStreamDataDispose(void *obj)
> {
> virFDStreamDataPtr fdst = obj;
>
> VIR_DEBUG("obj=%p", fdst);
> + virFDStreamMsgQueueFree(&fdst->msg);
> }
>
> static int virFDStreamDataOnceInit(void)
> @@ -108,6 +136,89 @@ static int virFDStreamDataOnceInit(void)
> VIR_ONCE_GLOBAL_INIT(virFDStreamData)
>
>
> +static int
> +virFDStreamMsgQueuePush(virFDStreamDataPtr fdst,
> + virFDStreamMsgPtr msg,
> + int fd,
> + const char *fdname)
> +{
> + virFDStreamMsgPtr *tmp = &fdst->msg;
> + char c = '1';
> +
> + while (*tmp)
> + tmp = &(*tmp)->next;
> +
> + *tmp = msg;
> + virCondSignal(&fdst->threadCond);
> +
> + if (safewrite(fd, &c, sizeof(c)) != sizeof(c)) {
> + virReportSystemError(errno,
> + _("Unable to write to %s"),
> + fdname);
> + return -1;
> + }
> +
> + return 0;
> +}
> +
> +
> +static virFDStreamMsgPtr
> +virFDStreamMsgQueuePop(virFDStreamDataPtr fdst,
> + int fd,
> + const char *fdname)
> +{
> + virFDStreamMsgPtr tmp = fdst->msg;
> + char c;
> +
> + if (tmp) {
> + fdst->msg = tmp->next;
> + tmp->next = NULL;
> + }
> +
> + virCondSignal(&fdst->threadCond);
> +
> + if (saferead(fd, &c, sizeof(c)) != sizeof(c)) {
> + virReportSystemError(errno,
> + _("Unable to read from %s"),
> + fdname);
> + return NULL;
> + }
> +
> + return tmp;
> +}
> +
> +
> +static void
> +virFDStreamMsgFree(virFDStreamMsgPtr msg)
> +{
> + if (!msg)
> + return;
> +
> + switch (msg->type) {
> + case VIR_FDSTREAM_MSG_TYPE_DATA:
> + VIR_FREE(msg->stream.data.buf);
> + break;
> + }
> +
> + VIR_FREE(msg);
> +}
> +
> +
> +static void
> +virFDStreamMsgQueueFree(virFDStreamMsgPtr *queue)
> +{
> + virFDStreamMsgPtr tmp = *queue;
> +
> + while (tmp) {
> + virFDStreamMsgPtr next = tmp->next;
> + virFDStreamMsgFree(tmp);
> + tmp = next;
> + }
> +
> + *queue = NULL;
> +}
> +
> +
> static int virFDStreamRemoveCallback(virStreamPtr stream)
> {
> virFDStreamDataPtr fdst = stream->privateData;
> @@ -273,6 +384,7 @@ typedef virFDStreamThreadData *virFDStreamThreadDataPtr;
> struct _virFDStreamThreadData {
> virStreamPtr st;
> size_t length;
> + bool doRead;
> int fdin;
> char *fdinname;
> int fdout;
> @@ -293,6 +405,86 @@ virFDStreamThreadDataFree(virFDStreamThreadDataPtr data)
> }
>
>
> +static ssize_t
> +virFDStreamThreadDoRead(virFDStreamDataPtr fdst,
> + const int fdin,
> + const int fdout,
> + const char *fdinname,
> + const char *fdoutname,
> + size_t buflen)
> +{
> + virFDStreamMsgPtr msg = NULL;
> + char *buf = NULL;
> + ssize_t got;
got = -1;
Not really required yet, but if additional code gets added...
> +
> + if (VIR_ALLOC(msg) < 0)
> + goto error;
> +
> + if (VIR_ALLOC_N(buf, buflen) < 0)
> + goto error;
> +
> + if ((got = saferead(fdin, buf, buflen)) < 0) {
> + virReportSystemError(errno,
> + _("Unable to read %s"),
> + fdinname);
> + goto error;
> + }
> +
> + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> + msg->stream.data.buf = buf;
Could also go with the VIR_STEAL_PTR(msg->stream.data.buf, buf);
avoiding the buf = NULL below
> + msg->stream.data.len = got;
> + buf = NULL;
> +
> + virFDStreamMsgQueuePush(fdst, msg, fdout, fdoutname);
> + msg = NULL;
*QueuePush is not a void. What happens if safewrite fails?
> +
> + return got;
> +
> + error:
> + VIR_FREE(buf);
> + virFDStreamMsgFree(msg);
> + return -1;
> +}
> +
> +
> +static ssize_t
> +virFDStreamThreadDoWrite(virFDStreamDataPtr fdst,
> + const int fdin,
> + const int fdout,
> + const char *fdinname,
> + const char *fdoutname)
> +{
> + ssize_t got;
got = -1
is required here since got is interdeterminate if msg->type != TYPE_DATA
> + virFDStreamMsgPtr msg = fdst->msg;
> + bool pop = false;
> +
> + switch (msg->type) {
> + case VIR_FDSTREAM_MSG_TYPE_DATA:
> + got = safewrite(fdout,
> + msg->stream.data.buf + msg->stream.data.offset,
> + msg->stream.data.len - msg->stream.data.offset);
> + if (got < 0) {
> + virReportSystemError(errno,
> + _("Unable to write %s"),
> + fdoutname);
> + return -1;
> + }
> +
> + msg->stream.data.offset += got;
> +
> + pop = msg->stream.data.offset == msg->stream.data.len;
> + break;
> + }
> +
> + if (pop) {
> + virFDStreamMsgQueuePop(fdst, fdin, fdinname);
*QueuePop is not a void... What if saferead fails?
> + virFDStreamMsgFree(msg);
> + }
> +
> + return got;
> +}
> +
> +
> static void
> virFDStreamThread(void *opaque)
> {
> @@ -304,14 +496,12 @@ virFDStreamThread(void *opaque)
> int fdout = data->fdout;
> char *fdoutname = data->fdoutname;
> virFDStreamDataPtr fdst = st->privateData;
> - char *buf = NULL;
> + bool doRead = fdst->threadDoRead;
> size_t buflen = 256 * 1024;
> size_t total = 0;
>
> virObjectRef(fdst);
> -
> - if (VIR_ALLOC_N(buf, buflen) < 0)
> - goto error;
> + virObjectLock(fdst);
>
> while (1) {
> ssize_t got;
> @@ -323,39 +513,56 @@ virFDStreamThread(void *opaque)
> if (buflen == 0)
> break; /* End of requested data from client */
>
> - if ((got = saferead(fdin, buf, buflen)) < 0) {
> - virReportSystemError(errno,
> - _("Unable to read %s"),
> - fdinname);
> + while (doRead == (fdst->msg != NULL) &&
> + !fdst->threadQuit) {
> + if (virCondWait(&fdst->threadCond, &fdst->parent.lock)) {
> + virReportSystemError(errno, "%s",
> + _("failed to wait on condition"));
> + goto error;
> + }
> + }
> +
> + if (fdst->threadQuit) {
> + /* If stream abort was requested, quit early. */
> + if (fdst->threadAbort)
> + goto cleanup;
> +
> + /* Otherwise flush buffers and quit gracefully. */
> + if (doRead == (fdst->msg != NULL))
> + break;
> + }
> +
> + if (doRead)
> + got = virFDStreamThreadDoRead(fdst,
> + fdin, fdout,
> + fdinname, fdoutname,
> + buflen);
> + else
> + got = virFDStreamThreadDoWrite(fdst,
> + fdin, fdout,
> + fdinname, fdoutname);
> +
> + if (got < 0)
> goto error;
> - }
>
> if (got == 0)
> break;
>
> total += got;
> -
> - if (safewrite(fdout, buf, got) < 0) {
> - virReportSystemError(errno,
> - _("Unable to write %s"),
> - fdoutname);
> - goto error;
> - }
> }
>
> cleanup:
> + fdst->threadQuit = true;
> + virObjectUnlock(fdst);
> if (!virObjectUnref(fdst))
> st->privateData = NULL;
> VIR_FORCE_CLOSE(fdin);
> VIR_FORCE_CLOSE(fdout);
> virFDStreamThreadDataFree(data);
> - VIR_FREE(buf);
> return;
>
> error:
> - virObjectLock(fdst);
> fdst->threadErr = errno;
> - virObjectUnlock(fdst);
> goto cleanup;
> }
>
> @@ -367,6 +574,10 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool
> streamAbort)
> if (!fdst->thread)
> return 0;
>
> + fdst->threadAbort = streamAbort;
> + fdst->threadQuit = true;
> + virCondSignal(&fdst->threadCond);
> +
> /* Give the thread a chance to lock the FD stream object. */
> virObjectUnlock(fdst);
> virThreadJoin(fdst->thread);
> @@ -380,6 +591,7 @@ virFDStreamJoinWorker(virFDStreamDataPtr fdst, bool
> streamAbort)
> ret = 0;
> cleanup:
> VIR_FREE(fdst->thread);
> + virCondDestroy(&fdst->threadCond);
> return ret;
> }
>
> @@ -426,11 +638,14 @@ virFDStreamCloseInt(virStreamPtr st, bool streamAbort)
> fdst->abortCallbackDispatching = false;
> }
>
> - /* mutex locked */
> - ret = VIR_CLOSE(fdst->fd);
> if (virFDStreamJoinWorker(fdst, streamAbort) < 0)
> ret = -1;
>
> + /* mutex locked */
> + if ((ret = VIR_CLOSE(fdst->fd)) < 0)
> + virReportSystemError(errno, "%s",
> + _("Unable to close"));
> +
> st->privateData = NULL;
>
> /* call the internal stream closing callback */
> @@ -467,7 +682,8 @@ virFDStreamAbort(virStreamPtr st)
> static int virFDStreamWrite(virStreamPtr st, const char *bytes, size_t
> nbytes)
> {
> virFDStreamDataPtr fdst = st->privateData;
> - int ret;
> + virFDStreamMsgPtr msg = NULL;
> + int ret = -1;
>
> if (nbytes > INT_MAX) {
> virReportSystemError(ERANGE, "%s",
> @@ -495,25 +711,51 @@ static int virFDStreamWrite(virStreamPtr st, const char
> *bytes, size_t nbytes)
> nbytes = fdst->length - fdst->offset;
> }
>
> - retry:
> - ret = write(fdst->fd, bytes, nbytes);
> - if (ret < 0) {
> - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> - if (errno == EAGAIN || errno == EWOULDBLOCK) {
> - VIR_WARNINGS_RESET
> - ret = -2;
> - } else if (errno == EINTR) {
> - goto retry;
> - } else {
> - ret = -1;
> - virReportSystemError(errno, "%s",
> + if (fdst->thread) {
> + char *buf;
> +
> + if (fdst->threadQuit) {
> + virReportSystemError(EBADF, "%s",
> _("cannot write to stream"));
> + return -1;
either virObjectUnlock(fdst); goto cleanup (other fdst remains locked)
> + }
> +
> + if (VIR_ALLOC(msg) < 0 ||
> + VIR_ALLOC_N(buf, nbytes) < 0)
> + goto cleanup;
> +
> + memcpy(buf, bytes, nbytes);
> + msg->type = VIR_FDSTREAM_MSG_TYPE_DATA;
> + msg->stream.data.buf = buf;
> + msg->stream.data.len = nbytes;
> +
> + virFDStreamMsgQueuePush(fdst, msg, fdst->fd, "pipe");
*QueuePush is not a void... What happens if safewrite fails?
> + msg = NULL;
> + ret = nbytes;
> + } else {
> + retry:
> + ret = write(fdst->fd, bytes, nbytes);
> + if (ret < 0) {
> + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> + if (errno == EAGAIN || errno == EWOULDBLOCK) {
> + VIR_WARNINGS_RESET
> + ret = -2;
> + } else if (errno == EINTR) {
> + goto retry;
> + } else {
> + ret = -1;
> + virReportSystemError(errno, "%s",
> + _("cannot write to stream"));
> + }
Should there be a goto cleanup here so that we avoid any chance that
fstd->length > 0 and thus fdst->offset gets decremented by 1 or 2? Or
me thinking that could happen without actually looking for whether
fdst->length could be non zero here.
> }
> - } else if (fdst->length) {
> - fdst->offset += ret;
> }
>
> + if (fdst->length)
> + fdst->offset += ret;
> +
> + cleanup:
> virObjectUnlock(fdst);
> + virFDStreamMsgFree(msg);
> return ret;
> }
>
> @@ -521,7 +763,7 @@ static int virFDStreamWrite(virStreamPtr st, const char
> *bytes, size_t nbytes)
> static int virFDStreamRead(virStreamPtr st, char *bytes, size_t nbytes)
> {
> virFDStreamDataPtr fdst = st->privateData;
> - int ret;
> + int ret = -1;
>
> if (nbytes > INT_MAX) {
> virReportSystemError(ERANGE, "%s",
> @@ -547,24 +789,70 @@ static int virFDStreamRead(virStreamPtr st, char
> *bytes, size_t nbytes)
> nbytes = fdst->length - fdst->offset;
> }
>
> - retry:
> - ret = read(fdst->fd, bytes, nbytes);
> - if (ret < 0) {
> - VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> - if (errno == EAGAIN || errno == EWOULDBLOCK) {
> - VIR_WARNINGS_RESET
> - ret = -2;
> - } else if (errno == EINTR) {
> - goto retry;
> - } else {
> - ret = -1;
> - virReportSystemError(errno, "%s",
> - _("cannot read from stream"));
> + if (fdst->thread) {
> + virFDStreamMsgPtr msg = NULL;
> +
> + while (!(msg = fdst->msg)) {
> + if (fdst->threadQuit) {
> + if (nbytes) {
> + virReportSystemError(EBADF, "%s",
> + _("stream is not open"));
> + } else {
> + ret = 0;
> + }
> + goto cleanup;
> + } else {
> + virObjectUnlock(fdst);
> + virCondSignal(&fdst->threadCond);
> + virObjectLock(fdst);
> + }
> + }
> +
> + if (msg->type != VIR_FDSTREAM_MSG_TYPE_DATA) {
> + /* Nope, nope, I'm outta here */
> + virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> + _("unexpected message type"));
> + goto cleanup;
> + }
> +
> + if (nbytes > msg->stream.data.len - msg->stream.data.offset)
> + nbytes = msg->stream.data.len - msg->stream.data.offset;
> +
> + memcpy(bytes,
> + msg->stream.data.buf + msg->stream.data.offset,
> + nbytes);
> +
> + msg->stream.data.offset += nbytes;
> + if (msg->stream.data.offset == msg->stream.data.len) {
> + virFDStreamMsgQueuePop(fdst, fdst->fd, "pipe");
*QueuePop is not a void... what if saferead fails.
> + virFDStreamMsgFree(msg);
> + }
> +
> + ret = nbytes;
> +
> + } else {
> + retry:
> + ret = read(fdst->fd, bytes, nbytes);
> + if (ret < 0) {
> + VIR_WARNINGS_NO_WLOGICALOP_EQUAL_EXPR
> + if (errno == EAGAIN || errno == EWOULDBLOCK) {
> + VIR_WARNINGS_RESET
> + ret = -2;
> + } else if (errno == EINTR) {
> + goto retry;
> + } else {
> + ret = -1;
> + virReportSystemError(errno, "%s",
> + _("cannot read from stream"));
> + }
> + goto cleanup;
I think I know the answer to my question from virFDStreamWrite now..
John
> }
> - } else if (fdst->length) {
> - fdst->offset += ret;
> }
>
> + if (fdst->length)
> + fdst->offset += ret;
> +
> + cleanup:
> virObjectUnlock(fdst);
> return ret;
> }
> @@ -609,11 +897,19 @@ static int virFDStreamOpenInternal(virStreamPtr st,
> st->privateData = fdst;
>
> if (threadData) {
> + fdst->threadDoRead = threadData->doRead;
> +
> /* Create the thread after fdst and st were initialized.
> * The thread worker expects them to be that way. */
> if (VIR_ALLOC(fdst->thread) < 0)
> goto error;
>
> + if (virCondInit(&fdst->threadCond) < 0) {
> + virReportSystemError(errno, "%s",
> + _("cannot initialize condition variable"));
> + goto error;
> + }
> +
> if (virThreadCreate(fdst->thread,
> true,
> virFDStreamThread,
> @@ -782,6 +1078,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> VIR_STRDUP(threadData->fdoutname, "pipe") < 0)
> goto error;
> tmpfd = pipefds[0];
> + threadData->doRead = true;
> } else {
> threadData->fdin = pipefds[0];
> threadData->fdout = fd;
> @@ -789,6 +1086,7 @@ virFDStreamOpenFileInternal(virStreamPtr st,
> VIR_STRDUP(threadData->fdoutname, path) < 0)
> goto error;
> tmpfd = pipefds[1];
> + threadData->doRead = false;
> }
> }
>
>
--
libvir-list mailing list
[email protected]
https://www.redhat.com/mailman/listinfo/libvir-list